2

我使用 Spark 2.1.1。

我有以下DataSet<Row>ds1;

 name   | ratio | count  // column names
"hello" |  1.56 | 34 

ds1.isStreamingtrue

我正在尝试生成DataSet<String>ds2。换句话说,当我写信给卡夫卡水槽时,我想写这样的东西

{"name": "hello", "ratio": 1.56, "count": 34}

我已经尝试过这样的事情,df2.toJSON().writeStream().foreach(new KafkaSink()).start()但是它给出了以下错误

Queries with streaming sources must be executed with writeStream.start()

to_jsonjson_tuple但是我不确定如何在这里利用它们?


我尝试了以下使用json_tuple()功能

 Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());

我收到以下错误:

无法解析“ result”给定的输入列:[名称、比率、计数];;

4

1 回答 1

5

tl;dr使用struct函数后跟to_jsontoJSON由于SPARK-17029仅在20 天前得到修复,因此流数据集已损坏)。


引用struct的scaladoc :

struct(colName: String, colNames: String*): Column创建一个包含多个输入列的新结构列。

鉴于您使用 Java API,您也有 4 种不同的struct函数变体:

public static Column struct(Column... cols)创建一个新的结构列。

使用to_json函数可以涵盖您的情况:

public static Column to_json(Column e)将包含 StructType 的列转换为具有指定模式的 JSON 字符串。

以下是 Scala 代码(将其翻译成 Java 是您的家庭练习):

val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record                                  |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+

我还尝试了您的解决方案(使用今天构建的 Spark 2.3.0-SNAPSHOT),它似乎运行良好。

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string")
fromKafka.
  toJSON. // <-- JSON conversion
  writeStream.
  format("console"). // using console sink
  start

format("kafka")SPARK-19719中添加,在 2.1.0 中不可用。

于 2017-05-31T09:35:06.683 回答