1

如您所知,发送消息有两种方式——同步和异步。

当我们使用同步模式编码时,代码如下所示

producer.send(new ProducerRecord<Long, Event>(topicName, event)).get();

从 Kafka 文档https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-中读取,它定义为下列的:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Asynchronously send a record to a topic. Equivalent to send(record, null). See send(ProducerRecord, Callback) for details.

Specified by:
    send in interface Producer<K,V> 

所以,基本上 send() 方法返回一个 Futher,然后一旦我为这个未来使用 .get() ,它就会变成同步行为。

我的问题是,从定义上看,我没有看到异常定义,如何在同步send()下捕获异常?似乎没有定义任何例外。有人可以帮助澄清吗?

4

1 回答 1

0

如果正在发送的记录在某种程度上是无效的并且可以在与 Kafka 集群通信之前被拒绝,那么.send将立即抛出 KafkaException(例如,如果记录太大,您会得到 RecordTooLargeException)。

如果集群端有问题,比如分区不可用,你会得到一个被 ExecutionException 包裹的 KafkaException .get()

还有一些极端情况,例如抛出 BufferExhaustedException 或通用异常 - 您可能需要查看源代码(尤其是 KafkaProducer 的doSend),不幸的是文档并不完美。

请参阅该方法中源代码的评论:

            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
于 2021-02-25T07:13:29.787 回答