我有一个 Kafka ConsumerGroup (Golang/Segmentio) 和一个读者作为单元测试用例的一部分
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test",
CommitInterval: time.Second, // flushes commits to Kafka every second
MaxWait: 10 * time.Millisecond,
GroupID: "test_logs",
StartOffset: kafka.LastOffset,
})
阅读器能够阅读消息,但在阅读了主题中的所有消息后,它正在等待新消息,并且测试用例在 30 秒后超时。
如果我设置context.WithTimeout(ctx, 20*time.Second)
,那么阅读器将无法阅读该主题的任何消息。
那么我怎样才能从主题中读取消息并继续而不是等待新消息。