我有一个DataSet[Row]
每行都是 JSON 字符串的地方。我只想打印 JSON 流或计算每批的 JSON 流。
到目前为止,这是我的代码
val ds = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",bootstrapServers"))
.option("subscribe", topicName)
.option("checkpointLocation", hdfsCheckPointDir)
.load();
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload)
val ds2 = ds1.select($"payload.info")
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start()
query.awaitTermination()
select * from table; -- don't see anything and there are no errors. However when I run my Kafka consumer separately (independent ofSpark I can see the data)
我的问题真的是我需要做什么才能使用结构化流打印从 Kafka 接收的数据?Kafka 中的消息是 JSON 编码的字符串,因此我将 JSON 编码的字符串转换为某种结构,并最终转换为数据集。我正在使用 Spark 2.1.0