4

我正在尝试执行聚合,然后在 Structured Streaming 上进行自联接DataFrame。假设 df 如下所示:

sourceDf.show(false)
+-----+-------+
|owner|fruits |
+-----+-------+
|Brian|apple  |
|Brian|pear   |
|Brian|melon  |
|Brian|avocado|
|Bob  |avocado|
|Bob  |apple  |
+-----+-------+

在 staticDataFrame上,很容易:

val aggDf = sourceDf.groupBy($"owner").agg(collect_list(col("fruits")) as "fruitsA")
sourceDf.join(aggDf, Seq("owner")).show(false)
+-----+-------+-----------------------------+
|owner|fruits |fruitsA                      |
+-----+-------+-----------------------------+
|Brian|apple  |[apple, pear, melon, avocado]|
|Brian|pear   |[apple, pear, melon, avocado]|
|Brian|melon  |[apple, pear, melon, avocado]|
|Brian|avocado|[apple, pear, melon, avocado]|
|Bob  |avocado|[avocado, apple]             |
|Bob  |apple  |[avocado, apple]             |
+-----+-------+-----------------------------+

不幸的是,在 Streaming 的情况下,我无法弄清楚如何做到这一点DataFrame。因此,我尝试使用以下完整代码,该代码将 Kafka 用于 Source 和 Sink:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructType}


object Test {

  val spark: SparkSession = SparkSession.builder().getOrCreate()
  import spark.implicits._

  val brokers = "kafka:9092"

  val inputTopic = "test.kafka.sink.input"
  val aggTopic = "test.kafka.sink.agg"
  val outputTopicSelf = "test.kafka.sink.output.self"
  val outputTopicTwo = "test.kafka.sink.output.two"

  val payloadSchema: StructType = new StructType()
    .add("owner", StringType)
    .add("fruits", StringType)

  val payloadSchemaA: StructType = new StructType()
    .add("owner", StringType)
    .add("fruitsA", StringType)

  var joinedDfSchema: StructType = _

  val sourceDf: DataFrame = Seq(
    ("Brian", "apple"),
    ("Brian", "pear"),
    ("Brian", "melon"),
    ("Brian", "avocado"),
    ("Bob", "avocado"),
    ("Bob", "apple")
  )
    .toDF("owner", "fruits")

  val additionalData: DataFrame = Seq(("Bob", "grapes")).toDF("owner", "fruits")

  def saveDfToKafka(df: DataFrame): Unit = {
    df
      .select(to_json(struct(df.columns.map(column): _*)).alias("value"))
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("topic", inputTopic)
      .save()
  }

  // save data to kafka (batch)
  saveDfToKafka(sourceDf)

  // kafka source
  val farmDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", brokers)
    .option("startingOffsets", "earliest")
    .option("subscribe", inputTopic)
    .load()
    .byteArrayToString("value")
    .withColumn("value", from_json($"value", payloadSchema))
    .expand("value")

  farmDF.printSchema()

  implicit class DFHelper(df: DataFrame) {
    def expand(column: String): DataFrame = {
      val wantedColumns = df.columns.filter(_ != column) :+ s"$column.*"
      df.select(wantedColumns.map(col): _*)
    }

    def byteArrayToString(column: String): DataFrame = {
      val selectedCols = df.columns.filter(_ != column) :+ s"CAST($column AS STRING)"
      df.selectExpr(selectedCols: _*)
    }
  }

  def testSelfAggJoinFail(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .groupBy($"owner")
      .agg(collect_list(col("fruits")) as "fruitsA")

    // joined df
    val joinedDF = farmDF
      .join(myFarmDF.as("myFarmDF"), Seq("owner"))
      .select("owner", "fruits", "myFarmDF.fruitsA")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointSelf")
      .option("topic", outputTopicSelf)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def testSelfAggJoin(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .withWatermark("timestamp", "30 seconds")
      .groupBy(
        window($"timestamp", "30 seconds", "15 seconds"),
        $"owner"
      )
      .agg(collect_list(col("fruits")) as "fruitsA")
      .select("owner", "fruitsA", "window")

    // joined df
    val joinedDF = farmDF
        .as("farmDF")
      .withWatermark("timestamp", "30 seconds")
      .join(
        myFarmDF.as("myFarmDF"),
        expr(
          """
            |farmDF.owner = myFarmDF.owner AND
            |farmDF.timestamp >= myFarmDF.window.start AND
            |farmDF.timestamp <= myFarmDF.window.end
          """.stripMargin))
      .select("farmDF.owner", "farmDF.fruits", "myFarmDF.fruitsA")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointSelf")
      .option("topic", outputTopicSelf)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def testTwoDfAggJoin(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .withWatermark("timestamp", "30 seconds")
      .groupBy(
        $"owner"
      )
      .agg(collect_list(col("fruits")) as "fruitsA")
      .select("owner", "fruitsA")

    // save the aggregated df to kafka
    myFarmDF
      .select(to_json(struct(myFarmDF.columns.map(column):_*)).alias("value"))
      .writeStream
      .outputMode("update")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointAgg")
      .option("topic", aggTopic)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)

    // read the aggregated df from kafka as a stream
    val aggDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("startingOffsets", "earliest")
      .option("subscribe", aggTopic)
      .load()
      .byteArrayToString("value")
      .withColumn("value", from_json($"value", payloadSchemaA))
      .expand("value")
      .withWatermark("timestamp", "30 seconds")

    // joined df
    val joinedDF = farmDF
      .as("farmDF")
      .join(
        aggDF.as("myFarmDF"),
        expr(
          """
            |farmDF.owner = myFarmDF.owner AND
            |farmDF.timestamp >= myFarmDF.timestamp - interval 1 hour AND
            |farmDF.timestamp <= myFarmDF.timestamp + interval 1 hour
          """.stripMargin))
      .select("farmDF.owner", "myFarmDF.fruitsA", "farmDF.fruits")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column):_*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointTwo")
      .option("topic", outputTopicTwo)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def data(topic: String): DataFrame = {
    // let's read back the output topic using kafka batch
    spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topic)
      .load()
      .byteArrayToString("value")
      .withColumn("value", from_json($"value", joinedDfSchema))
      .expand("value")
  }
}

现在,如果我在 Streaming 上进行测试DataFrame

scala> Test.testSelfAggJoinFail
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Project [structstojson(named_struct(owner, owner#59, fruits, fruits#60, fruitsA, fruitsA#78), Some(Etc/UTC)) AS value#96]
+- Project [owner#59, fruits#60, fruitsA#78]
   +- Project [owner#59, key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, fruits#60, fruitsA#78]
      +- Join Inner, (owner#59 = owner#82)
         :- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#59, value#51.fruits AS fruits#60]
         :  +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51]
         :     +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43]
         :        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28]
         +- SubqueryAlias myFarmDF
            +- Aggregate [owner#82], [owner#82, collect_list(fruits#83, 0, 0) AS fruitsA#78]
               +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#82, value#51.fruits AS fruits#83]
                  +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51]
                     +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43]
                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:110)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:235)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
  at Test$.testSelfAggJoinFail(<console>:123)
  ... 51 elided

它失败了,Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark因为我不使用水印。

现在,如果我可以运行第二个测试

Test.testSelfAggJoin

我收到这些警告

2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end

我可以检查结果

Test.data(Test.outputTopicSelf).show(false)
2018-09-12 16:08:01 WARN  NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 2 : {test.kafka.sink
.output.self=LEADER_NOT_AVAILABLE}
2018-09-12 16:08:01 WARN  NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 6 : {test.kafka.sink
.output.self=LEADER_NOT_AVAILABLE}
+---+-----+---------+------+---------+-------------+-----+------+-------+
|key|topic|partition|offset|timestamp|timestampType|owner|fruits|fruitsA|
+---+-----+---------+------+---------+-------------+-----+------+-------+
+---+-----+---------+------+---------+-------------+-----+------+-------+

它返回一个空DataFrame(可能是因为警告?)。我无法找到自加入的解决方案。

最后,我尝试将聚合下沉到 Kafka 并将其作为第二个 Streaming 重新读取DataFrame,如

scala> Test.data(Test.outputTopicTwo).show(false)
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|key |topic                     |partition|offset|timestamp              |timestampType|owner|fruitsA                           |fruits |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|null|test.kafka.sink.output.two|0        |0     |2018-09-12 16:57:04.376|0            |Brian|["avocado","apple","pear","melon"]|avocado|
|null|test.kafka.sink.output.two|0        |1     |2018-09-12 16:57:04.376|0            |Bob  |["apple","avocado"]               |apple  |
|null|test.kafka.sink.output.two|0        |2     |2018-09-12 16:57:04.38 |0            |Brian|["avocado","apple","pear","melon"]|apple  |
|null|test.kafka.sink.output.two|0        |3     |2018-09-12 16:57:04.38 |0            |Bob  |["apple","avocado"]               |avocado|
|null|test.kafka.sink.output.two|0        |4     |2018-09-12 16:57:04.381|0            |Brian|["avocado","apple","pear","melon"]|pear   |
|null|test.kafka.sink.output.two|0        |5     |2018-09-12 16:57:04.382|0            |Brian|["avocado","apple","pear","melon"]|melon  |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+

哪个有效(虽然不是很有效,我会说)但是如果我向源主题添加其他数据:

scala> Test.saveDfToKafka(Test.additionalData)
scala> Test.data(Test.outputTopicTwo).show(false)
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|key |topic                     |partition|offset|timestamp              |timestampType|owner|fruitsA                           |fruits |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|null|test.kafka.sink.output.two|0        |0     |2018-09-12 16:57:04.376|0            |Brian|["avocado","apple","pear","melon"]|avocado|
|null|test.kafka.sink.output.two|0        |1     |2018-09-12 16:57:04.376|0            |Bob  |["apple","avocado"]               |apple  |
|null|test.kafka.sink.output.two|0        |2     |2018-09-12 16:57:04.38 |0            |Brian|["avocado","apple","pear","melon"]|apple  |
|null|test.kafka.sink.output.two|0        |3     |2018-09-12 16:57:04.38 |0            |Bob  |["apple","avocado"]               |avocado|
|null|test.kafka.sink.output.two|0        |4     |2018-09-12 16:57:04.381|0            |Brian|["avocado","apple","pear","melon"]|pear   |
|null|test.kafka.sink.output.two|0        |5     |2018-09-12 16:57:04.382|0            |Brian|["avocado","apple","pear","melon"]|melon  |
|null|test.kafka.sink.output.two|0        |6     |2018-09-12 16:59:37.125|0            |Bob  |["apple","avocado"]               |grapes |
|null|test.kafka.sink.output.two|0        |7     |2018-09-12 16:59:40.001|0            |Bob  |["apple","avocado","grapes"]      |apple  |
|null|test.kafka.sink.output.two|0        |8     |2018-09-12 16:59:40.002|0            |Bob  |["apple","avocado","grapes"]      |avocado|
|null|test.kafka.sink.output.two|0        |9     |2018-09-12 16:59:40.002|0            |Bob  |["apple","avocado","grapes"]      |grapes |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+

我得到了更多的行,可能是因为我不得不.outputMode("update")在下沉聚合 Df 时使用。

  • 有没有办法在不将聚合作为单独的主题发送回 Kafka 的情况下执行此聚合?
  • 如果没有,是否可以修改testTwoDfAggJoin使用.outputMode("append")
4

2 回答 2

2

从 Spark 2.3 开始,当 join 之前涉及一些聚合函数时,两个流式 DF 的 Join 是不可能的。

从火花文档

Additional details on supported joins:

    Joins can be cascaded, that is, you can do df1.join(df2, ...).join(df3, ...).join(df4, ....).

    As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

    As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

        Cannot use streaming aggregations before joins.

        Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
于 2019-06-27T13:02:25.063 回答
0

我遇到了类似的错误信息,outputMode很重要agg,我通过添加df.writeStream.outputMode(OutputMode.Update())或解决df.writeStream.outputMode(OutputMode.Complete())

参考:

输出模式 有几种类型的输出模式。

附加模式(默认) - 这是默认模式,只有自上次触发后添加到结果表的新行才会输出到接收器。这仅适用于那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行将仅输出一次(假设容错接收器)。例如,只有 select、where、map、flatMap、filter、join 等的查询将支持 Append 模式。

完成模式 - 每次触发后,整个结果表都将输出到接收器。这支持聚合查询。

更新模式 - (自 Spark 2.1.1 起可用)只有自上次触发后更新的结果表中的行才会输出到接收器。更多信息将在未来版本中添加。

http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-3/

于 2018-11-13T11:53:43.850 回答