0

我有一个 kafka 监听器,它抛出一个异常并转到 seektocurrenterrorhandler 进行处理。我正在尝试包装侦听器以添加闩锁并对其进行断言以进行测试。但它似乎不起作用。有人可以建议。

听众:

@KafkaListener(id="testRetry",topics = "sr1", groupId = "retry-grp", containerFactory = "kafkaRetryListenerContainerFactory")
    public void listen1(ConsumerRecord<String, Anky> record,
            @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery, Acknowledgment ack) throws UserException {
            throw new UserException(code.getDbError());

    }

配置:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Anky> kafkaRetryListenerContainerFactory(SeekToCurrentErrorHandler seekToCurrentErrorHandler) {

        ConcurrentKafkaListenerContainerFactory<String, Anky> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        // to get the no retry count from the header
        factory.getContainerProperties().setDeliveryAttemptHeader(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }


    @Bean
    public SeekToCurrentErrorHandler seekToCurrentErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            //do nothing
        });
        errorHandler.setCommitRecovered(true);
        errorHandler.setBackOffFunction((record, exception) -> {
            return new FixedBackOff(0L,5L);

        });
        return errorHandler;
    }

测试:

@Test
public void testRetry() throws Exception,UserException {
    Anky msg = new Anky();
    this.template.send("sr1", "1234", msg);
    CountDownLatch latch = new CountDownLatch(1);
            ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) kafkaListenerEndpointRegistry
            .getListenerContainer("testRetry");
    container.stop();
    @SuppressWarnings("unchecked")
    AcknowledgingConsumerAwareMessageListener<String, Anky> messageListener = (AcknowledgingConsumerAwareMessageListener<String, Anky>) container
            .getContainerProperties().getMessageListener();

    container.getContainerProperties()
            .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Anky() {

                @Override
                public void onMessage(ConsumerRecord<String, Anky> data, Acknowledgment acknowledgment,
                        Consumer<?, ?> consumer) {
                    messageListener.onMessage(data, acknowledgment, consumer);
                    latch.countDown();
                }

            });
    container.start();
    assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    
}
4

1 回答 1

0

添加一个try...finally块。

try {
    messageListener.onMessage(data, acknowledgment, consumer);
}
finally {
    latch.countDown();
}
于 2020-11-12T15:49:21.817 回答