我正在尝试ForeachWriter
在 Spark 2.1 中使用接口它的接口,但我不能使用它。
问问题
1403 次
2 回答
1
Spark 2.2.0 将支持它。要了解如何使用它,我建议您阅读这篇博文:https ://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache -spark-2-2.html
您可以尝试 Spark 2.2.0 RC2 [1] 或等待最终版本。
如果您不能使用 Spark 2.2.0+,另一种选择是查看此博客:
它有一个非常简单的 Kafka 水槽,也许这对你来说已经足够了。
[1] http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html
于 2017-05-05T18:24:19.670 回答
0
首先要知道的是,如果您使用 Spark 结构化 Stream 并处理流数据,您将拥有一个流数据集。
话虽这么说,编写此流数据集的方法是调用ForeachWriter,你做对了..
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start
写入主题的函数将如下所示:
private def processRow(value: Commons.UserEvent) = {
/*
* Producer.send(topic, data)
*/
}
于 2017-05-19T11:02:44.113 回答