8

Shopify /sarama是否提供类似于transactional.idJVM API 的选项?

该库支持幂等性(Config.Producer.Idemponent,类似于enable.idempotence),但我不明白如何在没有transactional.id.

如果我错了,请纠正我,Sarama 中缺少关于这些选项的文档。但是根据 JVM 文档,没有标识符的幂等性将受到单个生产者会话的限制。换句话说,当生产者失败并重新启动时,我们将失去保证。

我在源代码和一些测试(例如)中找到了相关属性,但不明白如何在外部使用它们。

4

1 回答 1

2

Shopify/sarama 为Kafka Exactly Once (Idempotency) 提供了启用幂等的生产者。但是对于下面的配置设置需要在那里。

来自Shopify/sarama/config.go

    if c.Producer.Idempotent {
        if !c.Version.IsAtLeast(V0_11_0_0) {
            return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
        }
        if c.Producer.Retry.Max == 0 {
            return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
        }
        if c.Producer.RequiredAcks != WaitForAll {
            return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
        }
        if c.Net.MaxOpenRequests > 1 {
            return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
        }
    }

Shopify/sarama他们是如何做到这一点的,在's中有一个producerEpochID 。您可以参考Shopify/sarama/async_producer.go中的文件。此 ID 使用生产者初始化进行初始化,并在成功生成每条消息时递增。读取函数以在文件中查看。AsyncProducertransactionManagerbumpEpoch()async_producer.go

这是该生产者与代理的会话的序列 ID,它与每条消息一起发送。消息发布成功时递增。

阅读这个例子。它描述了幂等性是如何工作的。

您对生产者会话事实是正确的。对于单个生产者会话,这正是曾经承诺过的。在序列失败后重新启动生产者时,可能会有重复。

当生产者重新启动时,会分配新的 PID。因此,幂等性仅被承诺为单个生产者会话。即使生产者在失败时重试请求,每条消息也会在日志中仅保留一次。根据生产者获取数据的来源,仍然可能存在重复。Kafka 不会处理生产者收到的重复数据。因此,在某些情况下,您可能需要额外的重复数据删除系统。

于 2021-06-09T18:27:42.850 回答