8

如果我连续向 Kafka 集群发布多条消息(使用新的 Producer API),我会Future从生产者那里为每条消息获取一个。

现在,假设我已经将我的生产者配置为拥有max.in.flight.requests.per.connection = 1并且retries > 0我可以等待最后一个未来并确定所有以前的都已经交付(并且按顺序)?还是我需要等待所有期货?在代码中,我可以这样做

Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
  f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
  f.get();
} catch(ExecutionException e) {
  //handle exception
}

而不是这个:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
  futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
  for(Future<?> f : futureList) {
    f.get();
  }
} catch(ExecutionException e) {
  //handle exception
}

并请放心,如果这里没有发现任何内容(来自第一个片段):

try {
  f.get();
} catch(ExecutionException e) {

然后我所有的消息都按顺序存储在集群中(无论生产者是否在后台执行了任何重试),如果出现问题,那么即使它不是最后一个未来,我也会在那里得到一个异常(我是等着)那第一次遇到的问题?

还有更多奇怪的角落案例需要注意吗?

4

2 回答 2

2

除了 Ewen 所说的,您还可以在循环中发送完所有消息后调用flush() 。此调用将阻塞,直到所有期货都完成,因此在此之后您可以检查期货是否有任何异常。你需要坚持所有的未来才能做到这一点。

另一种方法是在您的发送中使用回调并存储任何返回的异常,如下所示。在检查异常之前,再次使用刷新可确保所有发送都已完成。

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}
于 2017-02-01T14:24:34.197 回答
2

您可以这样做,但前提您 a) 将重试设置为无限(或实际上是无限)并且 b) 如果遇到不可重试的异常,可以丢弃数据。

多解释一下,Kafka 有两类例外。可重试异常是指再次运行可能会成功的失败。例如,NotEnoughReplicasException表示副本数量少于您的要求,因此请求被拒绝。但是如果一个失败的代理重新上线,那么你可能有足够的副本,恢复良好,如果你再次发送请求就会成功。相反, aSerializationException是不可重试的,因为我们没有理由相信如果您再次尝试序列化,结果会有所不同。

生产者重试仅适用于您遇到不可重试异常的点。因此,如果您从未遇到任何这些,请使用无限重试,并使用您提到的其他设置,一旦解决了最终的未来,就可以保证订购和成功交付。但是,由于您可能会遇到不可重试的异常,因此处理每个未来(或回调)肯定要好得多,并确保您至少在请求失败时记录一些内容。

于 2016-08-05T01:59:18.433 回答