2

我有一个 Kafka ConsumerGroup (Golang/Segmentio) 和一个读者作为单元测试用例的一部分

r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        []string{"localhost:9092"},
        Topic:          "test",
        CommitInterval: time.Second, // flushes commits to Kafka every second
        MaxWait:        10 * time.Millisecond,
        GroupID:        "test_logs",
        StartOffset:    kafka.LastOffset,
    })

阅读器能够阅读消息,但在阅读了主题中的所有消息后,它正在等待新消息,并且测试用例在 30 秒后超时。

如果我设置context.WithTimeout(ctx, 20*time.Second),那么阅读器将无法阅读该主题的任何消息。

那么我怎样才能从主题中读取消息并继续而不是等待新消息。

4

0 回答 0