是的,不幸的是 Spark (1.x, 2.x) 并没有直接说明如何以有效的方式写入 Kafka。
我建议采用以下方法:
KafkaProducer
每个执行程序进程/JVM使用(和重用)一个实例。
这是此方法的高级设置:
- 首先,您必须“包装”Kafka
KafkaProducer
,因为正如您所提到的,它是不可序列化的。包装它允许您将它“运送”给执行者。这里的关键思想是使用 alazy val
以便您将实例化生产者延迟到第一次使用,这实际上是一种解决方法,因此您不必担心KafkaProducer
无法序列化。
- 您通过使用广播变量将包装的生产者“运送”到每个执行者。
- 在您的实际处理逻辑中,您通过广播变量访问包装的生产者,并使用它将处理结果写回 Kafka。
从 Spark 2.0 开始,以下代码片段适用于 Spark Streaming。
第 1 步:包装KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new MySparkKafkaProducer(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
第 2 步:使用广播变量为每个执行程序提供自己的包装KafkaProducer
实例
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}
第 3 步:从 Spark Streaming 写入 Kafka,重新使用相同的包装KafkaProducer
实例(针对每个执行程序)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}
希望这可以帮助。