1

我正在尝试将 Spark DF(批处理 DF)写入 Kafka,我需要将数据写入特定分区。

我尝试了以下代码

myDF.write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaProps.getBootstrapServers)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.truststore.location", kafkaProps.getTrustStoreLocation)
  .option("kafka.truststore.password", kafkaProps.getTrustStorePassword)
  .option("kafka.keystore.location", kafkaProps.getKeyStoreLocation)
  .option("kafka.keystore.password", kafkaProps.getKeyStorePassword)
  .option("kafka.partitioner.class", "util.MyCustomPartitioner")
  .option("topic",kafkaProps.getTopicName)
  .save()

我正在编写的 DF 架构是

+---+---------+-----+
|key|partition|value|
+---+---------+-----+
+---+---------+-----+

我必须重新分区(到 1 个分区)“myDF”,因为我需要根据日期列对数据进行排序。

它将数据写入单个分区,但不是 DF 的“分区”列中的数据或自定义分区器返回的数据(与分区列中的值相同)。

谢谢萨蒂什

4

1 回答 1

0

根据2.4.7 文档,在 Dataframe 中使用“分区”列的功能仅适用于 3.x 版本,而不是更早的版本

但是,使用该选项kafka.partitioner.class仍然有效。Spark Structured Streaming 允许您在使用 prefix 时使用普通的 KafkaConsumer 配置kafka.,因此这也适用于版本 2.4.4。

下面的代码在 Spark 3.0.1 和 Confluent 社区版 5.5.0 上运行良好。在 Spark 2.4.4 上,“分区”列没有任何影响,但我的自定义分区程序类适用。

case class KafkaRecord(partition: Int, value: String)

val spark = SparkSession.builder()
  .appName("test")
  .master("local[*]")
  .getOrCreate()


// create DataFrame
import spark.implicits._
val df = Seq((0, "Alice"), (1, "Bob")).toDF("partition", "value").as[KafkaRecord]

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .save()

然后您在控制台消费者中看到的内容:

# partition 0
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 0
Alice

# partition 1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 1
Bob

使用自定义时也得到相同的结果Partitioner

.option("kafka.partitioner.class", "org.test.CustomPartitioner")

我的自定义分区器定义为

package org.test
class CustomPartitioner extends Partitioner {

  override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
    if (!valueBytes.isEmpty && valueBytes.map(_.toChar).mkString == "Bob") {
      0
    } else {
      1
    }
  }
}
于 2021-01-27T15:17:05.403 回答