我使用 Spark 2.1.1。
我有以下DataSet<Row>
ds1;
name | ratio | count // column names
"hello" | 1.56 | 34
(ds1.isStreaming
给true
)
我正在尝试生成DataSet<String>
ds2。换句话说,当我写信给卡夫卡水槽时,我想写这样的东西
{"name": "hello", "ratio": 1.56, "count": 34}
我已经尝试过这样的事情,df2.toJSON().writeStream().foreach(new KafkaSink()).start()
但是它给出了以下错误
Queries with streaming sources must be executed with writeStream.start()
有to_json
,json_tuple
但是我不确定如何在这里利用它们?
我尝试了以下使用json_tuple()
功能
Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());
我收到以下错误:
无法解析“
result
”给定的输入列:[名称、比率、计数];;