0

我在我最近的 c# 项目中使用了 Confluent kafka 包。我通过以下方式创建了一个生产者:

prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};

foreach(msg in msglist){
    using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
        producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
    }
}

但问题是我的一些消息没有到达消费者。他们正在某个地方迷路。但是,如果我对生产者使用等待,那么所有消息都会被传递。如何在不等待的情况下传递我的所有消息。(我有一个分区)

4

1 回答 1

4

首先,你应该只使用一个Producer来发送你的msgList,因为为每条消息创建一个新Producer的真的很昂贵。

你可以做的是使用Produce()方法 with Flush()。与Produce()您将异步发送消息而无需等待响应。然后调用一个Flush()将阻塞,直到所有正在进行的消息都被传递。

var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();

foreach (var msg in msglist)
{
    producer.Produce(topic, new Message<Null, string> { Value = msg });
}

producer.Flush();

没有await,否则Flush()您可能会丢失消息,因为您的生产者可能会在所有消息传递之前被处置。

于 2020-09-18T06:48:15.880 回答