您可以尝试使用nack(long sleep)
where 唯一参数表示sleep interval ms
来实现上述行为。
来自Spring for Apache Kafka 文档:
从版本 2.3开始,Acknowledgment 接口有两个额外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发 IllegalStateException。
将上述信息应用到代码示例中,我们得到:
@Component
@Slf4j
public class ExampleConsumer {
private boolean nonError = false;
@KafkaListener(topics = "topic_name")
private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());
if (nonError) {
log.info("ACK: {}", offset);
ack.acknowledge(); //send ack
if (offset % 2 == 0)
nonError = false;
} else {
ack.nack(0); // immediate seek - no sleep time for consumer
nonError = true;
}
}
}
配置如下所示:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// ...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ...
return props;
}
@Bean
public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
if (this.factory == null) {
this.factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
}
return this.factory;
}
该示例产生:
2020-07-31 17:05:19.275 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19
注意:KafkaEventPojo
是我的 POJO 实现,它按照我们的内部结构保存存储在 Kafka 中的记录数据 - 所以您可以根据需要更改它。此外,上面的代码演示了 nack 用于单记录侦听器的用法。如果您需要批处理选项,您可以在提供的文档中找到如何执行此操作的示例。