1

我们正在尝试实施 Kafka 作为我们的消息代理解决方案。我们正在 IBM BLuemix 中部署 Spring Boot 微服务,其内部消息代理实现是 Kafka 版本 0.10。由于我的经验更多是在 JMS、ActiveMQ 端,我想知道在 java 消费者中处理系统级错误的理想方法应该是什么?

以下是我们目前的实施方式

消费者财产

enable.auto.commit=false
auto.offset.reset=latest

我们使用默认属性

max.partition.fetch.bytes
session.timeout.ms

卡夫卡消费者

我们为每个主题旋转 3 个线程,它们都具有相同的 groupId,即每个线程一个 KafkaConsumer 实例。到目前为止,我们只有一个分区。消费者代码在线程类的构造函数中是这样的

kafkaConsumer = new KafkaConsumer<String, String>(properties);

    final List<String> topicList = new ArrayList<String>();
    topicList.add(properties.getTopic());

    kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
        }

        @Override
        public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
            try {
                logger.info("Partitions assigned, consumer seeking to end.");

                for (final TopicPartition partition : partitions) {
                    final long position = kafkaConsumer.position(partition);
                    logger.info("current Position: " + position);

                    logger.info("Seeking to end...");
                    kafkaConsumer.seekToEnd(Arrays.asList(partition));
                    logger.info("Seek from the current position: " + kafkaConsumer.position(partition));
                    kafkaConsumer.seek(partition, position);
                }
                logger.info("Consumer can now begin consuming messages.");
            } catch (final Exception e) {
                logger.error("Consumer can now begin consuming messages.");
            }

        }
    });  

实际读取发生在线程的 run 方法中

try {
            // Poll on the Kafka consumer every second.
            final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);


            // Iterate through all the messages received and print their
            // content.
            for (final TopicPartition partition : records.partitions()) {

                final List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                logger.info("consumer is alive and is processing   "+ partitionRecords.size() +" records");
                for (final ConsumerRecord<String, String> record : partitionRecords) {
                    logger.info("processing topic  "+ record.topic()+" for key "+record.key()+" on offset "+ record.offset());

                    final Class<? extends Event> resourceClass = eventProcessors.getResourceClass();
                    final Object obj = converter.convertToObject(record.value(), resourceClass);
                    if (obj != null) {
                        logger.info("Event: " + obj + " acquired by  " + Thread.currentThread().getName());
                        final CommsEvent event = resourceClass.cast(converter.convertToObject(record.value(), resourceClass));
                        final MessageResults results = eventProcessors.processEvent(event
                                );
                        if ("Success".equals(results.getStatus())) {
                            // commit the processed message which changes
                            // the offset
                            kafkaConsumer.commitSync();
                            logger.info("Message processed sucessfully");
                        } else {
                            kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                            logger.error("Error processing message : {} with error : {},resetting offset to {} ", obj,results.getError().getMessage(),record.offset());
                            break;
                        }

                    }
                }

            }
            // TODO add return

        } catch (final Exception e) {
            logger.error("Consumer has failed with exception: " + e, e);
            shutdown();
        }  

您会注意到 EventProcessor 是一个处理每条记录的服务类,在大多数情况下提交数据库中的记录。如果处理器抛出错误(系统异常或验证异常),我们不会提交,而是以编程方式将搜索设置为该偏移量,以便后续轮询将从该组 id 的该偏移量返回。

现在的疑问是,这是正确的方法吗?如果我们遇到错误并且我们设置了偏移量,那么在修复之前不会处理其他消息。这可能适用于系统错误,例如无法连接到数据库,但如果问题仅在于该事件而不是其他事件来处理这一记录,我们将无法处理任何其他记录。我们想到了 ErrorTopic 的概念,当我们遇到错误时,消费者会将该事件发布到 ErrorTopic,同时它将继续处理其他后续事件。但是看起来我们正在尝试将 JMS 的设计概念(由于我之前的经验)引入 kafka,并且可能有更好的方法来解决 kafka 中的错误处理。此外,从错误主题重新处理它可能会更改某些情况下我们不想要的消息序列

请让我知道有人是如何按照 Kafka 标准在他们的项目中处理这种情况的。

-塔塔

4

1 回答 1

1

如果问题仅在于该事件而不是其他事件来处理这一记录,我们将无法处理任何其他记录

这是正确的,您使用错误主题的建议似乎是可能的。

我还注意到,在您处理onPartitionsAssigned您的过程中,基本上不使用消费者承诺的偏移量,因为您似乎总是会寻求到最后。

如果你想从最后一个成功提交的偏移量重新开始,你不应该执行seek

最后,我想指出,虽然看起来你知道,在同一个组中有 3 个消费者订阅了一个分区 - 意味着 3 个消费者中有 2 个将是空闲的。

HTH江户

于 2017-05-24T09:49:24.637 回答