我试图让 kafka 消费者在特定时间内收集消息,之后我可以手动提交已收集的消息。但是我从shopify sarama找不到可以用来提交消息或一批消息的方法或api,请帮忙
问问题
1038 次
2 回答
1
使用自动提交,您无论如何都无法完全控制它何时发生。它是周期性的,并且为您在幕后发生。如果这对您不利,您也可以随时使用ConsumerGroupSession.MarkOffset(topic string, partition int32, offset int64, metadata string)
for committing(因此即使在特定时间后)将偏移量作为一批消费消息的最后一个。
于 2020-11-25T12:09:17.457 回答
0
您可以使用Offsets Config的AutoCommit字段的 Interval 参数:
// Offsets specifies configuration for how and when to commit consumed
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// Deprecated: CommitInterval exists for historical compatibility
// and should not be used. Please use Consumer.Offsets.AutoCommit
CommitInterval time.Duration
// AutoCommit specifies configuration for commit messages automatically.
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}
例如:
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
...
// Autocommit after two minutes
config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Minute
于 2020-11-25T10:30:38.567 回答