1

我试图让 kafka 消费者在特定时间内收集消息,之后我可以手动提交已收集的消息。但是我从shopify sarama找不到可以用来提交消息或一批消息的方法或api,请帮忙

4

2 回答 2

1

使用自动提交,您无论如何都无法完全控制它何时发生。它是周期性的,并且为您在幕后发生。如果这对您不利,您也可以随时使用ConsumerGroupSession.MarkOffset(topic string, partition int32, offset int64, metadata string)for committing(因此即使在特定时间后)将偏移量作为一批消费消息的最后一个。

于 2020-11-25T12:09:17.457 回答
0

您可以使用Offsets Config的AutoCommit字段的 Interval 参数:

// Offsets specifies configuration for how and when to commit consumed
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
    // Deprecated: CommitInterval exists for historical compatibility
    // and should not be used. Please use Consumer.Offsets.AutoCommit
    CommitInterval time.Duration

    // AutoCommit specifies configuration for commit messages automatically.
    AutoCommit struct {
        // Whether or not to auto-commit updated offsets back to the broker.
        // (default enabled).
        Enable bool

        // How frequently to commit updated offsets. Ineffective unless
        // auto-commit is enabled (default 1s)
        Interval time.Duration
    }

例如:

// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
...

// Autocommit after two minutes
config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Minute
于 2020-11-25T10:30:38.567 回答