1

我使用 Reactor Kafka(Kafka 的功能性 Java API)创建了一个 KafkaProducer(reactor.kafka.sender.KafkaSender)。使用以下生产者配置,

max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all

当我尝试将记录发布到无效主题时,出现超时异常

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms

正如预期的那样。但是我已经配置了没有发生的重试。我假设在max.block.ms/request.timeout.ms已经昏倒之后,每次retry.backoff.ms直到metadata.max.age.msretries用尽后都会重试。仅供参考,代码:

    String topic = "order/";
    int count = 1;
    Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
      Event event = new Event();
      return SenderRecord.create(
            new ProducerRecord<String, Event>(topic, event.getX(),
                event), event.getEvent());
    });
    kafkaSender.send(source).subscribe(x -> System.out.println(x));
    kafkaSender.close();
  • 启用重试的配置是否正确?
  • request.timeout.ms/之后何时重试max.block.ms
  • 需要在上面的代码中进行哪些更改以允许重试?
4

1 回答 1

0

我相信你也应该设置“delivery.timeout.ms”

请参阅此处的文档:https ://docs.confluent.io/current/installation/configuration/producer-configs.html#retries

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

于 2020-04-22T22:51:30.487 回答