我使用 Reactor Kafka(Kafka 的功能性 Java API)创建了一个 KafkaProducer(reactor.kafka.sender.KafkaSender)。使用以下生产者配置,
max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all
当我尝试将记录发布到无效主题时,出现超时异常
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms
正如预期的那样。但是我已经配置了没有发生的重试。我假设在max.block.ms
/request.timeout.ms
已经昏倒之后,每次retry.backoff.ms
直到metadata.max.age.ms
或retries
用尽后都会重试。仅供参考,代码:
String topic = "order/";
int count = 1;
Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
Event event = new Event();
return SenderRecord.create(
new ProducerRecord<String, Event>(topic, event.getX(),
event), event.getEvent());
});
kafkaSender.send(source).subscribe(x -> System.out.println(x));
kafkaSender.close();
- 启用重试的配置是否正确?
request.timeout.ms
/之后何时重试max.block.ms
?- 需要在上面的代码中进行哪些更改以允许重试?