0

我正在尝试ForeachWriter在 Spark 2.1 中使用接口它的接口,但我不能使用它。

4

2 回答 2

1

Spark 2.2.0 将支持它。要了解如何使用它,我建议您阅读这篇博文:https ://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache -spark-2-2.html

您可以尝试 Spark 2.2.0 RC2 [1] 或等待最终版本。

如果您不能使用 Spark 2.2.0+,另一种选择是查看此博客:

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

它有一个非常简单的 Kafka 水槽,也许这对你来说已经足够了。

[1] http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html

于 2017-05-05T18:24:19.670 回答
0

首先要知道的是,如果您使用 Spark 结构化 Stream 并处理流数据,您将拥有一个流数据集。

话虽这么说,编写此流数据集的方法是调用ForeachWriter,你做对了..

  import org.apache.spark.sql.ForeachWriter
  val writer = new ForeachWriter[Commons.UserEvent] {
  override def open(partitionId: Long, version: Long) = true
  override def process(value: Commons.UserEvent) = {
  processRow(value)
 }
 override def close(errorOrNull: Throwable) = {}
 }

 val query =
 ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start

写入主题的函数将如下所示:

    private def processRow(value: Commons.UserEvent) = {
     /*
     *  Producer.send(topic, data)
     */
   }
于 2017-05-19T11:02:44.113 回答