我有一个简单的反应式 Kafka 生产者,我需要它来手动提交事务。在 Reactor Kafka 中的javadoc 方法 begin()之后,我创建了一个生产者方法
@Autowired
private ReactiveKafkaProducerTemplate<String, String> kafkaTemplate;
public Mono<Void> send(String message) {
return kafkaTemplate.transactionManager()
.begin()
.then(kafkaTemplate.send("my.test.topic", message))
.then(kafkaTemplate.transactionManager().commit());
}
它的配置是
@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");
return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
}
它运行良好,我正在向 Kafka 发送消息。但现在我需要保护它免受我们公司潜在的 Kafka/网络中断的影响。为了模拟这一点,我停止了我的 Kafka docker,发送一条消息,一段时间后再次启动 Kafka。但是奇怪的事情正在发生。我遇到了 3 种可能的情况:
- 当我在一分钟左右启动 Kafka 时,生产者恢复了连接和事务,并且发送完成得很好
- 当我等待更长时间(约 3 - 4 分钟)时,发件人失败
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my.test.topic-0:219057 ms has passed since batch creation
此状态不可恢复,每次后续尝试均失败
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
我必须重新启动应用程序
- 当用户取消请求(即触发此发送的 REST 调用)然后 Kafka 再次出现时,每个后续请求都会失败
TransactionalId producer-tx-1: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
这种状态也是不可恢复的,我要重启应用。
问题
如何从这种情况中恢复过来?我想要实现的是生产者等待 fe 20 秒,如果没有重新建立连接,那么只需抛出异常并丢弃请求。最重要的是,系统不应该因为无法访问的 Kafka 而崩溃。
我试过的
起初我以为我需要手动中止事务kafkaTemplate.transactionManager().abort()
in some.doOnError()
但这不起作用(在场景 3 中是 fe。代码不会引发任何异常)。然后我尝试了ProducerConfig
尝试TRANSACTION_TIMEOUT_CONFIG
,REQUEST_TIMEOUT_MS_CONFIG
或者DELIVERY_TIMEOUT_MS_CONFIG
请求没有超时,不确定我做错了什么。