Shopify/sarama 为Kafka Exactly Once (Idempotency) 提供了启用幂等的生产者。但是对于下面的配置设置需要在那里。
来自Shopify/sarama/config.go
if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
}
if c.Producer.Retry.Max == 0 {
return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
}
if c.Producer.RequiredAcks != WaitForAll {
return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
}
if c.Net.MaxOpenRequests > 1 {
return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
}
}
在Shopify/sarama他们是如何做到这一点的,在's中有一个producerEpoch
ID 。您可以参考Shopify/sarama/async_producer.go中的文件。此 ID 使用生产者初始化进行初始化,并在成功生成每条消息时递增。读取函数以在文件中查看。AsyncProducer
transactionManager
bumpEpoch()
async_producer.go
这是该生产者与代理的会话的序列 ID,它与每条消息一起发送。消息发布成功时递增。
阅读这个例子。它描述了幂等性是如何工作的。
您对生产者会话事实是正确的。对于单个生产者会话,这正是曾经承诺过的。在序列失败后重新启动生产者时,可能会有重复。
当生产者重新启动时,会分配新的 PID。因此,幂等性仅被承诺为单个生产者会话。即使生产者在失败时重试请求,每条消息也会在日志中仅保留一次。根据生产者获取数据的来源,仍然可能存在重复。Kafka 不会处理生产者收到的重复数据。因此,在某些情况下,您可能需要额外的重复数据删除系统。