2

我想使用Confluent dotnet client在应用程序启动中处理一个主题。假设以下示例:

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

当 Kafka 中没有新消息时,c.Consume 将被阻塞。因为我想用它来启动应用程序(比如缓存预热),所以当我发现没有新消息时,我想继续我的代码。

我知道设置超时存在过载,c.Consume(timeout)但这种方法的问题是,如果您的主题中有一条消息,并且阅读消息的持续时间超过了您的超时,您会收到不希望的空输出。

4

3 回答 3

2

消费者不应该知道生产者。

现在,如果您想知道从开始消费的那一刻起您已经阅读了该主题中的所有内容,您可以:

  1. 在开始消费之前加载最新的偏移量。
  2. 然后开始消费消息。
  3. 如果消息的偏移量与您之前加载的最新偏移量相同,请停止消费。

我不是C#开发人员,但从我在 dotnet confluent 文档中读到的内容,您可以呼吁QueryWatermarkOffsets消费者获取最旧和最新的偏移量。https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_

然后,在Message类上你有一个Offset访问器。所以整个事情不应该太难实现。 https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset

于 2019-03-24T05:41:40.523 回答
2

您可以使用OnPartitionEOF指示您已到达分区末尾的事件。

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}
于 2019-03-24T09:01:07.383 回答
0

我发现 Consumer.IsPartitionEOF 很有用。

于 2021-01-15T11:04:00.810 回答