1

我有一个DataSet[Row]每行都是 JSON 字符串的地方。我只想打印 JSON 流或计算每批的 JSON 流。

到目前为止,这是我的代码

val ds = sparkSession.readStream()
               .format("kafka")
                .option("kafka.bootstrap.servers",bootstrapServers"))
               .option("subscribe", topicName)
               .option("checkpointLocation", hdfsCheckPointDir)
               .load();

val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload)
val ds2 = ds1.select($"payload.info")
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start()
query.awaitTermination()
select * from table; --  don't see anything and there are no errors. However when I run my Kafka consumer separately (independent ofSpark I can see the data)

我的问题真的是我需要做什么才能使用结构化流打印从 Kafka 接收的数据?Kafka 中的消息是 JSON 编码的字符串,因此我将 JSON 编码的字符串转换为某种结构,并最终转换为数据集。我正在使用 Spark 2.1.0

4

1 回答 1

1
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as payload).select($"payload.*")

That will print your data on the console.

ds1.writeStream.format("console").option("truncate", "false").start().awaitTermination()

Always use something like awaitTermination() or thread.Sleep(time in seconds) in these type of situations.

于 2017-09-23T04:27:01.480 回答