plutolove’s diary

I love three things in this world, the sun, the moon and you. The sun for the day, the moon for the night, and you forever。

Spark SQL 执行过程

以下列代码为例:

val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()

Parser

首先调用session中的sql函数,调用sqlParser的parsePlan方法得到LogicalPlan,此时的LogicalPlan只是语法树,其中的具体table信息,字段信息还没有绑定,具体代码如下:

def sql(sqlText: String): DataFrame = withActive {
    val tracker = new QueryPlanningTracker
    val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
      sessionState.sqlParser.parsePlan(sqlText)
    }
    Dataset.ofRows(self, plan, tracker)
  }

parser的基类为AbstractSqlParser,该类实现了parsePlan方法,parser的类型为SparkSqlParser,AbstractSqlParser中的AstBuilder对象实现了具体的parser,其中AstBuilder对象继承了SqlBaseBaseVisitor,而SqlBaseBaseVisitor是根据sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4生成的代码,具体可以了解一些antlr4的visitor模式。根据AstBuilder中的visitor方法实现了语法解析生成了LogicalPlan。

执行

DataSet的ofRows方法根据logical plan得到QueryExecution并执行。具体代码如下:

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
    : DataFrame = sparkSession.withActive {
    val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
    qe.assertAnalyzed()
    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
  }

由于spark有很多lazy操作,所以代码上不直观,需要查看具体的实现

// sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@transient private[sql] val logicalPlan: LogicalPlan = {
    // For various commands (like DDL) and queries with side effects, we force query execution
    // to happen right away to let these side effects take place eagerly.
    val plan = queryExecution.analyzed match {
      case c: Command => {
        LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
      }
      case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => {
        LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
      }
      case _ => queryExecution.analyzed
    }
    if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
      plan.setTagValue(Dataset.DATASET_ID_TAG, id)
    }
    plan
  }

 private[sql] def getRows(
      numRows: Int,
      truncate: Int): Seq[Seq[String]] = {
    val newDf = toDF()
    val castCols = newDf.logicalPlan.output.map { col =>
      // Since binary types in top-level schema fields have a specific format to print,
      // so we do not cast them to strings here.
      if (col.dataType == BinaryType) {
        Column(col)
      } else {
        Column(col).cast(StringType)
      }
    }
.....
}

DataSet的getRows方法调用上面的logicalPlan,logicalPlan执行withAction("command", queryExecution)(_.executeCollect())得到最后的row结果返回。其中withAction方法中有很多lazy操作,总的来说是从queryExecution的到SparkPlan然后执行对应的executeCollect方法返回Array[InternalRow]。

优化

一下逻辑分别为优化logical plan,转换为spark plan(物理plan),prepareForExecution将物理plan转换为可执行物理plan,然后执行execute

lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
    // clone the plan to avoid sharing the plan instance between different stages like analyzing,
    // optimizing and planning.
    val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
    // We do not want optimized plans to be re-analyzed as literals that have been constant folded
    // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
    // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
    plan.setAnalyzed()
    plan
  }
lazy val sparkPlan: SparkPlan = {
    // We need to materialize the optimizedPlan here because sparkPlan is also tracked under
    // the planning phase
    assertOptimized()
    executePhase(QueryPlanningTracker.PLANNING) {
      // Clone the logical plan here, in case the planner rules change the states of the logical
      // plan.
      QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
    }
  }
lazy val executedPlan: SparkPlan = {
    // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure
    // that the optimization time is not counted as part of the planning phase.
    assertOptimized()
    executePhase(QueryPlanningTracker.PLANNING) {
      // clone the plan to avoid sharing the plan instance between different stages like analyzing,
      // optimizing and planning.
      QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
    }
  }

CodeGen

CollapseCodegenStages中会根据当前stage是否支持codegen来处理spark plan,如果支持则返回WholeStageCodegenExec,在WholeStageCodegenExec中的execute接口调用docodegen生成代码执行

private[execution] def preparations(
      sparkSession: SparkSession,
      adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
    // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
    // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
    adaptiveExecutionRule.toSeq ++
    Seq(
     。。。
      CollapseCodegenStages(sparkSession.sessionState.conf),
。。。
    )
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())

生成的代码如下:

 while ( inputadapter_input_0.hasNext()) {
   InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
   
   boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
org.apache.spark.examples.sql.SparkSQLExample$Person inputadapter_value_0 = inputadapter_isNull_0 ?
  null : ((org.apache.spark.examples.sql.SparkSQLExample$Person)inputadapter_row_0.get(0, null));


 serializefromobject_doConsume_0(inputadapter_row_0, inputadapter_value_0, inputadapter_isNull_0);
   if (shouldStop()) return;
 }