0

我正在尝试将消息写入 Kafka,下面是我的生产者,如果我使用生产它有 DeliveryHandler 并且我可以访问 DeliveryReport,但是当我使用ProduceAsync时返回类型是 DeliveryResult 我如何获取 DeliveryReport 并记录失败的原因

使用生产:

    public void WriteMessage(string message)
    {
        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {
            producer.Produce(this._topicName, new Message<string, string>()
            {
                Key = rand.Next(5).ToString(),
                Value = message
            },
            (deliveryReport) =>
            {
                if (deliveryReport.Error.Code != ErrorCode.NoError)
                {
                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                }
                else
                {
                    Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
                }
            });

            producer.Flush(TimeSpan.FromSeconds(10));
        }
    }

在上面的代码中,我可以访问继承 DeliveryResult 的 DeliveryReport,并且可以访问 Error Reason 和 DeliveryResult --> TopicPartitionOffset,以下是元数据:

namespace Confluent.Kafka
{
    //
    // Summary:
    //     The result of a produce request.
    public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
    {
        public DeliveryReport();

        //
        // Summary:
        //     An error (or NoError) associated with the message.
        public Error Error { get; set; }
        //
        // Summary:
        //     The TopicPartitionOffsetError associated with the message.
        public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
    }
}

使用 ProduceAsync

    public async Task WriteAysncMessage(string message)
    {
        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {

            var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
            {
                Key = rand.Next(5).ToString(),
                Value = message
            });

            
            producer.Flush(TimeSpan.FromSeconds(60));
        }
    }

在上述方法中,使用 ProducerAsync 时,我如何访问 DeliveryReport 以记录错误原因,就像 Produce 一样,当我在 ProducerAsync 上等待时,它返回 DeliveryResult 但不返回 DeliveryReport

此外,在写入 Kafka 时使用 Produce 或 ProduceAsync 也很好。

4

1 回答 1

0

我想我得到了解决方案:

        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {
            try
            {
                var deliveryResult = await producer.ProduceAsync(this._topicName, new Message<string, string>()
                {
                    Key = rand.Next(5).ToString(),
                    Value = message
                });

                Console.WriteLine($"KAFKA => Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
            }
            catch (ProduceException<string, string> e)
            {
                Console.WriteLine($"Failed to deliver message: {e.Error.Reason}");
            }
            producer.Flush(TimeSpan.FromSeconds(60));
        }
于 2020-09-17T23:06:38.253 回答