如果我连续向 Kafka 集群发布多条消息(使用新的 Producer API),我会Future
从生产者那里为每条消息获取一个。
现在,假设我已经将我的生产者配置为拥有max.in.flight.requests.per.connection = 1
并且retries > 0
我可以等待最后一个未来并确定所有以前的都已经交付(并且按顺序)?还是我需要等待所有期货?在代码中,我可以这样做:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
f.get();
} catch(ExecutionException e) {
//handle exception
}
而不是这个:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
for(Future<?> f : futureList) {
f.get();
}
} catch(ExecutionException e) {
//handle exception
}
并请放心,如果这里没有发现任何内容(来自第一个片段):
try {
f.get();
} catch(ExecutionException e) {
然后我所有的消息都按顺序存储在集群中(无论生产者是否在后台执行了任何重试),如果出现问题,那么即使它不是最后一个未来,我也会在那里得到一个异常(我是等着)那第一次遇到的问题?
还有更多奇怪的角落案例需要注意吗?