1
static void DeliverCallback(rd_kafka_t *rk,  void *payload, size_t len, rd_kafka_resp_err_t error_code,  void *opaque, void *msg_opaque)
{
    if (error_code != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
            // todo resend the message

    }
}

我的 dr_cb 是这样的。我知道如何获取发送失败消息的内容,但如何获取主题?最优雅的方法是什么?

4

1 回答 1

2

dr_cb界面已过时且已弃用,您应该使用更丰富的界面,您可以在dr_msg_cb其中访问所有rd_kafka_message_t字段,针对您的案例rkmessage->rkt主题。

但是,您通常不应该重试在交付报告中生成失败消息,因为 librdkafka 将尽其所能在配置的约束 (message.timeout.msretries) 内生成消息,应用程序可以通过重试添加的价值不大同样的信息。

而是进行配置message.timeout.ms以适应您的业务需求,回答“尝试生成这段数据需要多长时间?”的问题,并将retries其设置为最大值(因为从应用程序的角度来看,重试次数无关紧要)。

如果重复或消息排序很关键,您还应该考虑使用幂等生产者 ( enable.idempotence=true)。

最后,librdkafkard_kafka_message_status()为每个传递或失败的消息提供了一个消息持久性指示器 API (),让应用程序知道该消息是否......:

  • 绝对不坚持
  • 可能持续存在(手动重试可能会导致重复)
  • 绝对坚持

有关librdkafka 中消息可靠性的更多信息,请参阅https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#message-reliability 。

于 2020-01-16T12:38:54.660 回答