我尝试通过事务向 Kafka 发送消息。所以,我使用这段代码:
try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
producer.initTransactions();
producer.beginTransaction();
Arrays.stream(messages).forEach(
message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
producer.commitTransaction();
}
...
private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
),
new VoidSerializer(),
new StringSerializer());
}
如果我使用本地 Kafka,效果很好。
但是如果我使用 Kafka TestContainers,它会冻结producer.initTransactions()
:
private static final String KAFKA_VERSION = "4.1.1";
@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
.withEmbeddedZookeeper();
如何配置 KafkaContainer 以处理事务?