我正在尝试将 Spark DF(批处理 DF)写入 Kafka,我需要将数据写入特定分区。
我尝试了以下代码
myDF.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProps.getBootstrapServers)
.option("kafka.security.protocol", "SSL")
.option("kafka.truststore.location", kafkaProps.getTrustStoreLocation)
.option("kafka.truststore.password", kafkaProps.getTrustStorePassword)
.option("kafka.keystore.location", kafkaProps.getKeyStoreLocation)
.option("kafka.keystore.password", kafkaProps.getKeyStorePassword)
.option("kafka.partitioner.class", "util.MyCustomPartitioner")
.option("topic",kafkaProps.getTopicName)
.save()
我正在编写的 DF 架构是
+---+---------+-----+
|key|partition|value|
+---+---------+-----+
+---+---------+-----+
我必须重新分区(到 1 个分区)“myDF”,因为我需要根据日期列对数据进行排序。
它将数据写入单个分区,但不是 DF 的“分区”列中的数据或自定义分区器返回的数据(与分区列中的值相同)。
谢谢萨蒂什