2

我正在使用 Spark 结构化流处理来自 Kafka 队列的大量数据并进行一些繁重的 ML 计算,但我需要将结果写入 Elasticsearch。

我尝试使用ForeachWriter但无法SparkContext在其中使用,另一种选择可能是HTTP PostForeachWriter.

现在,我正在考虑编写我自己的 ElasticsearchSink。

是否有任何文档可以为 Spark 结构化流创建接收器?

4

2 回答 2

4

如果您使用的是 Spark 2.2+ 和 ES 6.x,那么有一个开箱即用的 ES sink:

df
  .writeStream
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql") 
  .option("es.mapping.id", "mappingId")
  .start("index/type") // index/type

如果您像我一样使用 ES 5.x,您需要实现 anEsSink和 an EsSinkProvider

EsSinkProvider:

class EsSinkProvider extends StreamSinkProvider with DataSourceRegister {

  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): Sink = {

    EsSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "my-es-sink"
}

电汇:

case class ElasticSearchSink(sqlContext: SQLContext,
                             options: Map[String, String],
                             partitionColumns: Seq[String],
                             outputMode: OutputMode)
  extends Sink {


  override def addBatch(batchId: Long, df: DataFrame): Unit = synchronized {
    val schema = data.schema
    // this ensures that the same query plan will be used
    val rdd: RDD[String] = df.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row]).map(_.getAs[String](0))
    }

    // from org.elasticsearch.spark.rdd library
    EsSpark.saveJsonToEs(rdd, "index/type", Map("es.mapping.id" -> "mappingId"))
  }
}

最后,在编写流时,将此提供程序类用作format

df
  .writeStream
  .queryName("ES-Writer")
  .outputMode(OutputMode.Append())
  .format("path.to.EsProvider")
  .start()
于 2018-05-21T12:29:57.027 回答
1

你可以看看ForeachSink。它展示了如何实现 Sink 并将 DataFrame 转换为 RDD(这非常棘手并且有很大的注释)。但是,请注意 Sink API 仍然是私有的和不成熟的,将来可能会更改。

于 2017-02-23T23:06:02.867 回答