5

为什么select每批都打印该语句,但hello world只打印一次?

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

val in: DataFrame = sparkSession.readStream
 .schema(schema)
 .format("csv")
 .option("header", false)
 .option("maxFilesPerTrigger", 1)
 .option("delimiter", ";")
 .load("s3://xxxxxxxx")

val input: DataFrame = in.select("*")
 .transform { ds =>
   println("hello world")  // <-- Why is this printed out once?
   ds
}

import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
  .format("console")
  .start
4

1 回答 1

8

Spark 2.1.0-SNAPSHOT here(今天构建)但我相信它在 2.0 和现在之间没有变化。

$ ./bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.

在 Spark 的Structured Streaming中,您的流应用程序只是将相同的物理查询计划应用于输入数据源的技巧。

请注意,物理查询计划是您的决定因素Dataset(我使用 Spark SQL 的次数越多,我就越发现查询和数据集之间没有区别——它们现在可以互换)。

当您描述结构化查询时(无论它是一次性查询还是流式查询),它会经历解析、分析、优化和最终生成物理计划的 4 个阶段。您可以使用方法查看它explain(extended = true)

scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]

这些阶段是惰性的,只执行一次

一旦你有了物理计划,这些阶段就不会再执行了。您的Dataset管道已经计算过了,唯一缺少的部分是流经管道的数据。

这就是为什么您只看到一次“hello world”的原因——当流式查询计划被“执行”以产生物理计划时。它执行一次并针对处理源进行了优化Dataset(并且只有Dataset这样任何副作用都已被触发)。

一个有趣的案例。把它带到这里已经很多了!

于 2016-09-13T08:27:33.700 回答