我正在开发一个使用 KafkaIO 作为输入的 Beam 应用程序
KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("bootstrapServers")
.withTopic("topicName")
.withConsumerConfigUpdates(confs)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer((Deserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata();
我试图了解它的commitOffsetsInFinalize()
工作原理。
如何完成流式传输作业?管道中的最后一步是自定义 DoFn,它将消息写入DynamoDb
. 有没有办法在那里手动调用一些finalize()
方法,以便在每次成功执行后提交偏移量DoFn
?
我也很难理解检查点和最终确定之间的关系是什么?如果管道上没有启用检查点,我仍然能够完成并开始commitOffsetsInFinalize()
工作吗?
ps 管道现在的方式,即使commitOffsetsInFinalize()
每条消息都被读取,无论下游是否有故障正在提交,因此导致数据丢失。
谢谢!