0

我正在尝试将我的 DStream api 迁移到结构化流式传输,并在如何等待或无法将微批处理与结构化流式传输相关联时犹豫不决。

在下面的代码中,我正在创建直接流并永远等待,以便我可以无限期地使用 kafka 消息。

我怎样才能在结构化流媒体中实现同样的效果?

sparkSession.streams.awaitAnyTermination 就足够了吗?

我在流式传输和结构化流式传输中都放了一个示例代码。任何指示都会很有帮助。谢谢

val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer], 
        "value.deserializer" -> classOf[StringDeserializer],
        "auto.offset.reset" -> "latest",
        "max.poll.records" -> "1",
        "group.id" -> "test",
        "enable.auto.commit" -> (true: java.lang.Boolean))
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(10))
      val stream = KafkaUtils.createDirectStream[String, String](ssc,  PreferConsistent,Subscribe[String, String]("mytopic",kafkaParams))

performRddComputation(stream, sparkSession)

 ssc.start()
 ssc.awaitTermination()

结构化流式等效

val df = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("kafkfa.offset.strategy","latest")
      .option("subscribe", "mytopic")
      .load()
      df.printSchema()

      val tdf = df.selectExpr("CAST(value AS STRING)").as[String].select("value").writeStream.format("console")
    .option("truncate","false")
    .start()


    tdf.map(record =>  {//do something})

      sparkSession.streams.awaitAnyTermination
4

2 回答 2

1

如果您只有一个查询,只需awaitTermination在查询上使用:

val df = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("kafkfa.offset.strategy","latest")
      .option("subscribe", "mytopic")
      .load()
      df.printSchema()

val tdf = df.selectExpr("CAST(value AS STRING)").as[String]
    .select("value")
    .map(record =>  {//do something})
    .writeStream
    .format("console")
    .option("truncate","false")
    .start()

// do something

tdf.awaitTermination()

awaitTermination是一个阻塞调用,所以你之后写的任何东西都只会在查询终止后被调用。

如果您需要处理多个查询,您可以awaitAnyTermination使用StreamingQueryManager

sparkSession.streams.awaitAnyTermination()

如果即使其中一个查询失败,您也想保持应用程序运行,然后awaitAnyTermination()resetTerminated()循环中调用。

于 2019-04-18T09:06:05.477 回答
1

我将发布一个适合我的版本:

val df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafkfa.offset.strategy","latest")
  .option("subscribe", "mytopic")
  .load()
  //df.printSchema()

  val tdf = df.selectExpr("CAST(value AS STRING)")
    .select("value")
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate","false")
    .start()
  tdf.awaitAnyTermination()

它应该适合你

于 2019-04-15T15:48:13.830 回答