4

我正在运行 0.8 Kafka,并使用提供的 Java API 构建生产者。
发送消息(或消息)的 API 函数返回 void。

有没有办法获取已发送消息的状态?如果它发送或失败?

这对我们来说非常重要,因为我们正在从文件中读取消息,并且我们希望在发送完所有消息后删除该文件。但是如果出现错误并且没有发送一些消息并且我删除了文件,它将导致非常重要的数据丢失。

4

1 回答 1

2

您可以将您的生产者配置为等到它从 Kafka 集群 (request.required.acks) 收到 n 个确认后,以便在删除源文件之前确保数据已正确提交。

如果您确实需要确保消息发送成功,您可能需要考虑使生产者同步的替代方法(producer.type=sync)。这样,您将能够捕获阻塞调用引发的任何异常并采取相应措施。send() 抛出的异常是 kafka.common.FailedToSendMessageException。

Kafka 的 Java API 并不理想,希望对您有所帮助。

于 2014-05-05T15:20:02.960 回答