我正在尝试将消息写入 Kafka,下面是我的生产者,如果我使用生产它有 DeliveryHandler 并且我可以访问 DeliveryReport,但是当我使用ProduceAsync时返回类型是 DeliveryResult 我如何获取 DeliveryReport 并记录失败的原因
使用生产:
public void WriteMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
producer.Produce(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
},
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
}
});
producer.Flush(TimeSpan.FromSeconds(10));
}
}
在上面的代码中,我可以访问继承 DeliveryResult 的 DeliveryReport,并且可以访问 Error Reason 和 DeliveryResult --> TopicPartitionOffset,以下是元数据:
namespace Confluent.Kafka
{
//
// Summary:
// The result of a produce request.
public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
{
public DeliveryReport();
//
// Summary:
// An error (or NoError) associated with the message.
public Error Error { get; set; }
//
// Summary:
// The TopicPartitionOffsetError associated with the message.
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
}
}
使用 ProduceAsync
public async Task WriteAysncMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
producer.Flush(TimeSpan.FromSeconds(60));
}
}
在上述方法中,使用 ProducerAsync 时,我如何访问 DeliveryReport 以记录错误原因,就像 Produce 一样,当我在 ProducerAsync 上等待时,它返回 DeliveryResult 但不返回 DeliveryReport
此外,在写入 Kafka 时使用 Produce 或 ProduceAsync 也很好。