5

我正在尝试对主题数据进行一些丰富。因此,使用 Spark 结构化流从 Kafka 接收器读取回 Kafka。

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

但我得到一个例外:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

任何解决方法?

4

3 回答 3

3

Spark 2.1(目前是 Spark 的最新版本)没有它。下一个版本 - 2.2 - 将有 Kafka Writer,请参阅此提交

Kafka Sink 与 Kafka Writer 相同。

于 2017-03-24T10:08:00.257 回答
3

正如T. Gawęda提到的,没有 kafka 格式可以将流数据集写入 Kafka(即 Kafka 接收器)。

Spark 2.1 目前推荐的解决方案是使用foreach 运算符

foreach 操作允许对输出数据进行任意操作。从 Spark 2.1 开始,这仅适用于 Scala 和 Java。要使用它,您必须实现接口 ForeachWriter(Scala/Java 文档),该接口具有在触发器后生成作为输出的行序列时调用的方法。请注意以下要点。

于 2017-03-24T13:33:41.437 回答
0

尝试这个

ds.map(_.toString.getBytes).toDF("value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092"))
      .option("topic", topic)
      .start
      .awaitTermination()
于 2019-02-07T16:49:44.930 回答