我正在使用 Spark 结构化流处理来自 Kafka 队列的大量数据并进行一些繁重的 ML 计算,但我需要将结果写入 Elasticsearch。
我尝试使用ForeachWriter
但无法SparkContext
在其中使用,另一种选择可能是HTTP Post
在ForeachWriter
.
现在,我正在考虑编写我自己的 ElasticsearchSink。
是否有任何文档可以为 Spark 结构化流创建接收器?
我正在使用 Spark 结构化流处理来自 Kafka 队列的大量数据并进行一些繁重的 ML 计算,但我需要将结果写入 Elasticsearch。
我尝试使用ForeachWriter
但无法SparkContext
在其中使用,另一种选择可能是HTTP Post
在ForeachWriter
.
现在,我正在考虑编写我自己的 ElasticsearchSink。
是否有任何文档可以为 Spark 结构化流创建接收器?
如果您使用的是 Spark 2.2+ 和 ES 6.x,那么有一个开箱即用的 ES sink:
df
.writeStream
.outputMode(OutputMode.Append())
.format("org.elasticsearch.spark.sql")
.option("es.mapping.id", "mappingId")
.start("index/type") // index/type
如果您像我一样使用 ES 5.x,您需要实现 anEsSink
和 an EsSinkProvider
:
EsSinkProvider:
class EsSinkProvider extends StreamSinkProvider with DataSourceRegister {
override def createSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
EsSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "my-es-sink"
}
电汇:
case class ElasticSearchSink(sqlContext: SQLContext,
options: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode)
extends Sink {
override def addBatch(batchId: Long, df: DataFrame): Unit = synchronized {
val schema = data.schema
// this ensures that the same query plan will be used
val rdd: RDD[String] = df.queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row]).map(_.getAs[String](0))
}
// from org.elasticsearch.spark.rdd library
EsSpark.saveJsonToEs(rdd, "index/type", Map("es.mapping.id" -> "mappingId"))
}
}
最后,在编写流时,将此提供程序类用作format
:
df
.writeStream
.queryName("ES-Writer")
.outputMode(OutputMode.Append())
.format("path.to.EsProvider")
.start()
你可以看看ForeachSink。它展示了如何实现 Sink 并将 DataFrame 转换为 RDD(这非常棘手并且有很大的注释)。但是,请注意 Sink API 仍然是私有的和不成熟的,将来可能会更改。