以下列代码为例:
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show()
Parser
首先调用session中的sql函数,调用sqlParser的parsePlan方法得到LogicalPlan,此时的LogicalPlan只是语法树,其中的具体table信息,字段信息还没有绑定,具体代码如下:
def sql(sqlText: String): DataFrame = withActive { val tracker = new QueryPlanningTracker val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { sessionState.sqlParser.parsePlan(sqlText) } Dataset.ofRows(self, plan, tracker) }
parser的基类为AbstractSqlParser,该类实现了parsePlan方法,parser的类型为SparkSqlParser,AbstractSqlParser中的AstBuilder对象实现了具体的parser,其中AstBuilder对象继承了SqlBaseBaseVisitor,而SqlBaseBaseVisitor是根据sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
生成的代码,具体可以了解一些antlr4的visitor模式。根据AstBuilder中的visitor方法实现了语法解析生成了LogicalPlan。
执行
DataSet的ofRows方法根据logical plan得到QueryExecution并执行。具体代码如下:
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker) : DataFrame = sparkSession.withActive { val qe = new QueryExecution(sparkSession, logicalPlan, tracker) qe.assertAnalyzed() new Dataset[Row](qe, RowEncoder(qe.analyzed.schema)) }
由于spark有很多lazy操作,所以代码上不直观,需要查看具体的实现
// sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @transient private[sql] val logicalPlan: LogicalPlan = { // For various commands (like DDL) and queries with side effects, we force query execution // to happen right away to let these side effects take place eagerly. val plan = queryExecution.analyzed match { case c: Command => { LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect())) } case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => { LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect())) } case _ => queryExecution.analyzed } if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) { plan.setTagValue(Dataset.DATASET_ID_TAG, id) } plan } private[sql] def getRows( numRows: Int, truncate: Int): Seq[Seq[String]] = { val newDf = toDF() val castCols = newDf.logicalPlan.output.map { col => // Since binary types in top-level schema fields have a specific format to print, // so we do not cast them to strings here. if (col.dataType == BinaryType) { Column(col) } else { Column(col).cast(StringType) } } ..... }
DataSet的getRows方法调用上面的logicalPlan,logicalPlan执行withAction("command", queryExecution)(_.executeCollect())得到最后的row结果返回。其中withAction方法中有很多lazy操作,总的来说是从queryExecution的到SparkPlan然后执行对应的executeCollect方法返回Array[InternalRow]。
优化
一下逻辑分别为优化logical plan,转换为spark plan(物理plan),prepareForExecution将物理plan转换为可执行物理plan,然后执行execute
lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) // We do not want optimized plans to be re-analyzed as literals that have been constant folded // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia. plan.setAnalyzed() plan } lazy val sparkPlan: SparkPlan = { // We need to materialize the optimizedPlan here because sparkPlan is also tracked under // the planning phase assertOptimized() executePhase(QueryPlanningTracker.PLANNING) { // Clone the logical plan here, in case the planner rules change the states of the logical // plan. QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone()) } } lazy val executedPlan: SparkPlan = { // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure // that the optimization time is not counted as part of the planning phase. assertOptimized() executePhase(QueryPlanningTracker.PLANNING) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) } }
CodeGen
CollapseCodegenStages中会根据当前stage是否支持codegen来处理spark plan,如果支持则返回WholeStageCodegenExec,在WholeStageCodegenExec中的execute接口调用docodegen生成代码执行
private[execution] def preparations( sparkSession: SparkSession, adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = { // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( 。。。 CollapseCodegenStages(sparkSession.sessionState.conf), 。。。 ) QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
生成的代码如下:
while ( inputadapter_input_0.hasNext()) { InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next(); boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0); org.apache.spark.examples.sql.SparkSQLExample$Person inputadapter_value_0 = inputadapter_isNull_0 ? null : ((org.apache.spark.examples.sql.SparkSQLExample$Person)inputadapter_row_0.get(0, null)); serializefromobject_doConsume_0(inputadapter_row_0, inputadapter_value_0, inputadapter_isNull_0); if (shouldStop()) return; }