我有一个 kafka 监听器,它抛出一个异常并转到 seektocurrenterrorhandler 进行处理。我正在尝试包装侦听器以添加闩锁并对其进行断言以进行测试。但它似乎不起作用。有人可以建议。
听众:
@KafkaListener(id="testRetry",topics = "sr1", groupId = "retry-grp", containerFactory = "kafkaRetryListenerContainerFactory")
public void listen1(ConsumerRecord<String, Anky> record,
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery, Acknowledgment ack) throws UserException {
throw new UserException(code.getDbError());
}
配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Anky> kafkaRetryListenerContainerFactory(SeekToCurrentErrorHandler seekToCurrentErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, Anky> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
// to get the no retry count from the header
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setErrorHandler(seekToCurrentErrorHandler);
// to keep the consumers alive the failure gets reported to broker so that
// consumers remain alive
factory.setStatefulRetry(true);
factory.setConcurrency(2);
return factory;
}
@Bean
public SeekToCurrentErrorHandler seekToCurrentErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
//do nothing
});
errorHandler.setCommitRecovered(true);
errorHandler.setBackOffFunction((record, exception) -> {
return new FixedBackOff(0L,5L);
});
return errorHandler;
}
测试:
@Test
public void testRetry() throws Exception,UserException {
Anky msg = new Anky();
this.template.send("sr1", "1234", msg);
CountDownLatch latch = new CountDownLatch(1);
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) kafkaListenerEndpointRegistry
.getListenerContainer("testRetry");
container.stop();
@SuppressWarnings("unchecked")
AcknowledgingConsumerAwareMessageListener<String, Anky> messageListener = (AcknowledgingConsumerAwareMessageListener<String, Anky>) container
.getContainerProperties().getMessageListener();
container.getContainerProperties()
.setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Anky() {
@Override
public void onMessage(ConsumerRecord<String, Anky> data, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
}
});
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}