最近,我开始学习使用kafka工作。我正在从事的项目使用sarama。
为了阅读我使用的消息ConsumerGroup
。
foo
如果返回,我需要在一段时间后再次阅读该消息false
。如何才能做到这一点?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}