5

我使用Spring Kafka API来实现具有手动偏移管理的 Kafka 消费者:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

someCondition在这里,我希望消费者仅在持有时才提交偏移量。否则消费者应该休眠一段时间并再次阅读相同的消息

卡夫卡配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

使用当前配置,如果someCondition == false,消费者不会提交偏移量,但仍会读取下一条消息。如果没有执行 Kafka,有没有办法让消费者重新阅读消息acknowledgement

4

3 回答 3

8

正如@Gary 已经指出的那样,您的方向是正确的,seek()是这样做的方法。今天遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;


    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}
于 2016-12-22T10:49:07.310 回答
4

您可以停止并重新启动容器,它将被重新发送。

在即将发布的 1.1 版本中,您可以寻找所需的偏移量,它将被重新发送。

但是,如果它们已经被检索到,您仍然会首先看到稍后的消息,因此您也必须丢弃这些消息。

第二个里程碑具有该功能,我们预计它将在下周发布。

于 2016-09-16T16:42:04.527 回答
0

您可以尝试使用nack(long sleep)where 唯一参数表示sleep interval ms来实现上述行为。

来自Spring for Apache Kafka 文档

版本 2.3开始,Acknowledgment 接口有两个额外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发 IllegalStateException。

将上述信息应用到代码示例中,我们得到:

@Component
@Slf4j
public class ExampleConsumer {
    private boolean nonError = false;
    
    @KafkaListener(topics = "topic_name")
    private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
        log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());
        
        if (nonError) {
            log.info("ACK: {}", offset);
            ack.acknowledge(); //send ack
            if (offset % 2 == 0)
                nonError = false;
        } else {
            ack.nack(0); // immediate seek - no sleep time for consumer
            nonError = true;
        }
    }
}

配置如下所示:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // ...
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // ...

        return props;
    }

    @Bean
    public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
        if (this.factory == null) {
            this.factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        }

        return this.factory;
    }

该示例产生:

2020-07-31 17:05:19.275  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19

注意:KafkaEventPojo是我的 POJO 实现,它按照我们的内部结构保存存储在 Kafka 中的记录数据 - 所以您可以根据需要更改它。此外,上面的代码演示了 nack 用于单记录侦听器的用法。如果您需要批处理选项,您可以在提供的文档中找到如何执行此操作的示例。

于 2020-07-31T14:49:37.027 回答