1

当每条消息通过结构化流管道传输时,处理每条消息的“推荐”方式是什么(我在 spark 2.1.1 上,源是 Kafka 0.10.2.1)?

到目前为止,我正在查看dataframe.mapPartitions(因为我需要连接到 HBase,其客户端连接类不可序列化,因此mapPartitions)。

想法?

4

1 回答 1

2

您应该能够使用foreach输出接收器:https ://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks和https://spark.apache.org/docs/最新/结构化流媒体编程指南.html#using-foreach

即使客户端不可序列化,您也不必在ForeachWriter构造函数中打开它。只需将其保留为 None/null,并在方法中对其进行初始化,该open方法在序列化调用,但每个任务仅调用一次。

在某种伪代码中:

class HBaseForeachWriter extends ForeachWriter[MyType] {
  var client: Option[HBaseClient] = None
  def open(partitionId: Long, version: Long): Boolean = {
    client = Some(... open a client ...)
  }
  def process(record: MyType) = {
    client match {
      case None => throw Exception("shouldn't happen")
      case Some(cl) => {
        ... use cl to write record ...
      }
    }
  }
  def close(errorOrNull: Throwable): Unit = {
    client.foreach(cl => cl.close())
  }
}
于 2017-05-25T21:17:55.023 回答