0

我有一个简单的反应式 Kafka 生产者,我需要它来手动提交事务。在 Reactor Kafka 中的javadoc 方法 begin()之后,我创建了一个生产者方法

@Autowired
private ReactiveKafkaProducerTemplate<String, String> kafkaTemplate;

public Mono<Void> send(String message) {
    return kafkaTemplate.transactionManager()
            .begin()
            .then(kafkaTemplate.send("my.test.topic", message))
            .then(kafkaTemplate.transactionManager().commit());
}

它的配置是

@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

    return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
}

它运行良好,我正在向 Kafka 发送消息。但现在我需要保护它免受我们公司潜在的 Kafka/网络中断的影响。为了模拟这一点,我停止了我的 Kafka docker,发送一条消息,一段时间后再次启动 Kafka。但是奇怪的事情正在发生。我遇到了 3 种可能的情况:

  1. 当我在一分钟左右启动 Kafka 时,生产者恢复了连接和事务,并且发送完成得很好
  2. 当我等待更长时间(约 3 - 4 分钟)时,发件人失败
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my.test.topic-0:219057 ms has passed since batch creation

此状态不可恢复,每次后续尝试均失败

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state

我必须重新启动应用程序

  1. 当用户取消请求(即触发此发送的 REST 调用)然后 Kafka 再次出现时,每个后续请求都会失败
TransactionalId producer-tx-1: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

这种状态也是不可恢复的,我要重启应用。

问题

如何从这种情况中恢复过来?我想要实现的是生产者等待 fe 20 秒,如果没有重新建立连接,那么只需抛出异常并丢弃请求。最重要的是,系统不应该因为无法访问的 Kafka 而崩溃。

我试过的

起初我以为我需要手动中止事务kafkaTemplate.transactionManager().abort()in some.doOnError()但这不起作用(在场景 3 中是 fe。代码不会引发任何异常)。然后我尝试了ProducerConfig尝试TRANSACTION_TIMEOUT_CONFIGREQUEST_TIMEOUT_MS_CONFIG或者DELIVERY_TIMEOUT_MS_CONFIG请求没有超时,不确定我做错了什么。

4

1 回答 1

0
  1. 这是因为默认的delivery.timeout.ms是 2 分钟。

  2. 在那之后,您将需要创建一个新的生产者并处理旧的生产者。

于 2021-05-24T16:26:25.620 回答