4

我有一个 Spark 结构化流:

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .option("subscribe", "topic")
      .load()

我想使用 DataStreamWriter 将数据写入 FileSystem,

val query = df
          .writeStream
          .outputMode("append")
          .format("parquet")
          .start("data")

但是在文件夹中创建了零个文件data。只有_spark_metadata正在被创建。

但是,我可以在控制台上看到数据format是什么时候console

val query = df
          .writeStream
          .outputMode("append")
          .format("console")
          .start()

+--------------------+------------------+------------------+
|                time|              col1|              col2|
+--------------------+------------------+------------------+
|49368-05-11 20:42...|0.9166470338147503|0.5576946794171861|
+--------------------+------------------+------------------+

我无法理解其背后的原因。

火花 - 2.1.0

4

2 回答 2

4

我遇到了类似的问题,但出于不同的原因,在这里发布以防有人遇到同样的问题。在使用水印以附加模式将输出流写入文件时,结构化流有一个有趣的行为,它实际上不会写入任何数据,直到时间桶早于水印时间。如果您正在测试结构化流式传输并且有一个小时的水位标记,那么您将至少在一个小时内看不到任何输出。

于 2018-03-03T19:54:51.983 回答
2

我解决了这个问题。实际上,当我尝试在 上运行结构化流时,它给出了一个在流查询中无效spark-shell的错误,即:endingOffsets

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .option("subscribe", "topic")
      .load()


java.lang.IllegalArgumentException: ending offset not valid in streaming queries
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions$1.apply(KafkaSourceProvider.scala:374)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions$1.apply(KafkaSourceProvider.scala:373)
  at scala.Option.map(Option.scala:146)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:373)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:199)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
  ... 48 elided

所以,我endingOffsets从流式查询中删除了。

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("startingOffsets", "earliest")
      .option("subscribe", "topic")
      .load()

然后我尝试将流式查询的结果保存在 Parquet 文件中,在此期间我知道 - 必须指定检查点位置,即:

val query = df
          .writeStream
          .outputMode("append")
          .format("parquet")
          .start("data")

org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
  at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:207)
  at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:204)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:203)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:206)
  ... 48 elided

所以,我补充说checkPointLocation

val query = df
          .writeStream
          .outputMode("append")
          .format("parquet")
          .option("checkpointLocation", "checkpoint")
          .start("data")

完成这些修改后,我能够将流式查询的结果保存在 Parquet 文件中。

但是,奇怪的是,当我通过sbt应用程序运行相同的代码时,它没有抛出任何错误,但是当我通过spark-shell这些错误运行相同的代码时,却抛出了这些错误。我认为 Apache Spark 在通过sbt/ mavenapp 运行时也应该抛出这些错误。这对我来说似乎是一个错误!

于 2017-05-26T09:29:31.370 回答