我正在尝试将我的 DStream api 迁移到结构化流式传输,并在如何等待或无法将微批处理与结构化流式传输相关联时犹豫不决。
在下面的代码中,我正在创建直接流并永远等待,以便我可以无限期地使用 kafka 消息。
我怎样才能在结构化流媒体中实现同样的效果?
sparkSession.streams.awaitAnyTermination 就足够了吗?
我在流式传输和结构化流式传输中都放了一个示例代码。任何指示都会很有帮助。谢谢
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"max.poll.records" -> "1",
"group.id" -> "test",
"enable.auto.commit" -> (true: java.lang.Boolean))
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,Subscribe[String, String]("mytopic",kafkaParams))
performRddComputation(stream, sparkSession)
ssc.start()
ssc.awaitTermination()
结构化流式等效
val df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafkfa.offset.strategy","latest")
.option("subscribe", "mytopic")
.load()
df.printSchema()
val tdf = df.selectExpr("CAST(value AS STRING)").as[String].select("value").writeStream.format("console")
.option("truncate","false")
.start()
tdf.map(record => {//do something})
sparkSession.streams.awaitAnyTermination