我在这里浏览了 Spark 结构化流式传输 - Kafka 集成指南。
在这个链接上被告知
enable.auto.commit:Kafka 源不提交任何偏移量。
那么,一旦我的 spark 应用程序成功处理了每条记录,我该如何手动提交偏移量呢?
我在这里浏览了 Spark 结构化流式传输 - Kafka 集成指南。
在这个链接上被告知
enable.auto.commit:Kafka 源不提交任何偏移量。
那么,一旦我的 spark 应用程序成功处理了每条记录,我该如何手动提交偏移量呢?
tl;博士
无法向 Kafka 提交任何消息。从 Spark 版本 3.x 开始,您可以定义 Kafka 消费者组的名称,但是,这仍然不允许您提交任何消息。
根据结构化 Kafka 集成指南,您可以提供 ConsumerGroup 作为选项kafka.group.id
:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("kafka.group.id", "myConsumerGroup")
.load()
但是,Spark 仍然不会提交任何偏移量,因此您将无法“手动”向 Kafka 提交偏移量。此功能旨在使用基于角色的访问控制来处理 Kafka 的最新功能授权,您的 ConsumerGroup 通常需要遵循命名约定。
此处讨论并解决了 Spark 3.x 应用程序的完整示例。
Spark Structured Streaming + Kafka 集成指南清楚地说明了它如何管理 Kafka 偏移量。Spark不会将任何消息提交回 Kafka,因为它依赖于内部偏移管理来实现容错。
管理偏移量的最重要的 Kafka 配置是:
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
因此,在结构化流中,目前无法为 Kafka 消费者定义您的自定义 group.id,并且结构化流在内部管理偏移量,而不是提交回 Kafka(也不是自动)。
假设您有一个简单的 Spark Structured Streaming 应用程序,它可以读取和写入 Kafka,如下所示:
// create SparkSession
val spark = SparkSession.builder()
.appName("ListenerTester")
.master("local[*]")
.getOrCreate()
// read from Kafka topic
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testingKafkaProducer")
.option("failOnDataLoss", "false")
.load()
// write to Kafka topic and set checkpoint directory for this stream
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "testingKafkaProducerOut")
.option("checkpointLocation", "/home/.../sparkCheckpoint/")
.start()
提交此应用程序并处理数据后,可以在检查点目录中找到相应的偏移量:
myCheckpointDir/偏移量/
{"testingKafkaProducer":{"0":1}}
这里检查点文件中的条目确认0
要消费的分区的下一个偏移量是1
. 这意味着应用程序已经处理了从名为 的主题0
分区的偏移量。0
testingKafkaProducer
Spark文档中提供了有关容错语义的更多信息。
但是,如文档中所述,偏移量不会提交回 Kafka。这可以通过执行kafka-consumer-groups.sh
Kafka 安装来检查。
./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group "spark-kafka-source-92ea6f85-[...]-driver-0"
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testingKafkaProducer 0 - 1 - consumer-1-[...] /127.0.0.1 consumer-1
Kafka不知道该应用程序的当前偏移量,因为它从未提交过。
请仔细阅读以下 Spark 提交者 @JungtaekLim 关于解决方法的评论:“Spark 的容错保证基于 Spark 对偏移管理具有完全控制权这一事实,如果他们试图修改它,他们将取消保证。 (例如,如果他们更改为向 Kafka 提交偏移量,则没有批次信息,并且如果 Spark 需要移回特定批次,“后面”保证不再有效。)”
我在网上看到的一些研究是,您可以onQueryProgress
在自定义StreamingQueryListener
Spark 的方法的回调函数中提交偏移量。这样,您可以拥有一个跟踪当前进度的消费者组。但是,它的进展并不一定与实际的消费群体一致。
以下是一些您可能会觉得有帮助的链接: