0

我正在尝试测试@KafkaListener 正在使用的containerlistenerfactory 中定义的错误处理程序。

根据我要测试的侦听器抛出的不同异常,我有不同的重试计数是否正常工作。但是在它第一次抛出异常并转到错误处理程序之后,我得到了一个非法状态异常,所以它没有尝试在我正在编写的测试中重试。相同的代码在实际设置中有效。

这是我得到的例外。

异常后查找当前;嵌套异常是 org.springframework.kafka.listener.ListenerExecutionFailedException

我希望测试重试 10 次,然后在恢复中打印消息。但它不会重试,因为错误处理程序正在抛出非法状态异常。

有人可以建议吗?

@Configuration
@EnableKafka
public  class Config {

    public static boolean seekPerformed;
    
    public static int retries;
    
    private Integer retryCount=10;

    private Integer RetryCount2=5;
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @Spy
    private errorCodes errorCodes;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Anky>kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) {
        ConcurrentKafkaListenerContainerFactory<String, Anky>factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(embeddedKafka));
        factory.getContainerProperties().setDeliveryAttemptHeader(true);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            System.out.println(
                    "RetryPolicy** limit has been exceeded! You should really handle this better." + record.key());
        });
        errorHandler.setBackOffFunction((record, exception) -> {
            retries++;
            seekPerformed = true;
            int maxRetryCount = retryCount+ retryCount2;
            Anky msg = (Anky) record.value();

            if (msg.getErrorCode.equals(getExceptionA())) {
                return new FixedBackOff(0L,Long.valueOf(retryCount));
            }
            else {
                return new FixedBackOff(0L,Long.valueOf(retryCount2));
            }

        });
        errorHandler.setCommitRecovered(true);
        factory.setErrorHandler(errorHandler);
        factory.setConcurrency(2);
        //errorHandler.setLogLevel(Level.INFO);
        factory.setStatefulRetry(true);
        return factory;
    }

    @Bean
    public DefaultKafkaConsumerFactory<String, Anky> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
        //return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka));
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka), new StringDeserializer(),
                new JsonDeserializer<>(EdealsMessage.class, false));
    }

    @Bean
    public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokerAddress(0).toString());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "retry-grp");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new StringDeserializer());
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(EdealsMessage.class, false));
        return props;
    }


    @Bean
    public ProducerFactory<String, Object> testProducerFactory(EmbeddedKafkaBroker embeddedKafka) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokerAddress(0).toString());
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean
    public KafkaTemplate<String, Object> testKafkaTemplate(EmbeddedKafkaBroker embeddedKafka) {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(testProducerFactory(embeddedKafka));
        kafkaTemplate.setDefaultTopic("sr1");
        return kafkaTemplate;
    }

    @KafkaListener(topics = "sr1", groupId = "retry-grp",containerFactory = "kafkaListenerContainerFactory")
    public void listen1(ConsumerRecord<String, Anky> record,
            @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) throws AppException{

    
        try {
            throw new AppException(//this is our custom exception in the application);
        }
        catch(AppException se) {
            if(record.value().getNewErrorCode().equals(se.getErrorCode())) {
                System.out.println("are you here?");
                throw se;
            }
    }

}

@EnableKafka
@SpringBootTest
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = Config.class)
@EmbeddedKafka(
        partitions = 1, 
        controlledShutdown = true, topics = {"sr1"},
        brokerProperties = {
            "listeners=PLAINTEXT://localhost:3333", 
            "port=3333"
    })
public class KafkaRetryTest {

    @Autowired
    private Config config;

    @Autowired
    private KafkaTemplate<String, Object> template;
    
    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
    @Autowired
    EmbeddedKafkaBroker kafkaEmbedded;
    
    
    @Spy
    private ErrorCodes errorCodes;
    
    @BeforeEach
    public void setUp() throws Exception,SystemException {
      for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
        ContainerTestUtils.waitForAssignment(messageListenerContainer, 
        kafkaEmbedded.getPartitionsPerTopic()
        );
      }
    }

    @Test
    public void testStatefulRetry() throws Exception {
        Anky msg = new Anky();
        msg.setNewErrorCode(errorCodes.getExceptionA());
        this.template.send("sr1","3323800",msg);
        assertThat(this.config.seekPerformed).isTrue();
        System.out.println("******"+this.config.retries);
}   
}

*******更新的代码,我无法在 EmbeddedKafka 上使用 bootstrapServersProperty ="spring.kafka.bootstrap-servers,因为它一直在尝试访问 localhost:9092 并超时

这是我的测试文件配置

@EnableKafka
@SpringBootTest(classes=MyConsumer.class)
@ExtendWith(SpringExtension.class)
@DirtiesContext
@ContextConfiguration(classes = AppConfig.class)
@EmbeddedKafka(
    partitions = 1, 
     topics = {"test_topic"}
             , bootstrapServersProperty ="spring.kafka.bootstrap-servers"
    )
public class KafkaConsumerTest {
}

AppConfig***(从属性文件中读取引导地址)

@Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

@Bean
    public ConsumerFactory<String, Anky> consumerFactory() {

        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(Anky.class, false));
    }
4

1 回答 1

0

显示IllegalStateException.

异常后查找当前

这很正常;您可以通过设置日志级别来抑制它。

这对我来说按预期工作......

@SpringBootApplication
public class So64780994Application {

    public static void main(String[] args) {
        SpringApplication.run(So64780994Application.class, args);
    }

    @KafkaListener(id = "so64780994", topics = "so64780994")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so64780994").partitions(1).replicas(1).build();
    }

    @Bean
    ErrorHandler handler() {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((rec, ex) -> {
            System.out.println("Retries exhausted:" + rec);
        });
        eh.setBackOffFunction((rec, ex) -> {
            return new FixedBackOff(0L, 8);
        });
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so64780994", "foo");
    }

}
foo
2020-11-11 09:20:04.836  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:05.334  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:05.836  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:06.338  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:06.843  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:07.347  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:07.856  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:08.361  INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so64780994-1, groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
Retries exhausted:ConsumerRecord(topic = so64780994, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1605104404708, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)
于 2020-11-11T14:22:30.763 回答