0

描述

大约一个月前,我们从使用 eachMessage 切换到 eachBatch,最近注意到我们服务的消费者没有处理来自随机分区的事件。生产者没有发现任何问题,并且在正常工作大约一个月后才开始出现问题,而我们的配置或使用方法没有任何更改。唯一发生的变化是在我们的基础架构方面,我们在事件中心启用了捕获;但是,尚不清楚这如何导致此问题。经过进一步调查,我们注意到偏移量以某种方式被破坏并且不再处于应有的位置。实际上,消费者停止阅读消息,因为似乎没有任何消息要阅读。我们将offset重置为latest,解决了一段时间的问题;然而,它在一两个小时后返回(再次随机)。值得一提的是,使用 eachMessage 恢复处理可以防止这种偏移量的损坏。

我在下面包含了配置和示例实现。非常感谢您对此的任何帮助。

配置

this.allowAutoTopicCreation = false;
this.autoCommit = false;
this.autoCommitThreshold = 1;
this.consumerRetryOptions = {
  retries: 2,
  factor: 2,
  minTimeout: 2000,
  maxTimeout: 10000,
  randomize: true,
};
this.continueOnFailedEventProcessing = false;
this.eachBatchAutoResolve = false;
this.heartbeatInterval = undefined;
this.maxSleepCycles = 1;
this.partitionsConsumedConcurrently = 8;
this.recoveryPosition = false;

示例实现

public async consume () => {
  await this.client.run({
    eachBatchAutoResolve: false,
    autoCommitThreshold: 1,
    partitionsConsumedConcurrently: 8,
    eachBatch: async ({
      batch,
      resolveOffset,
      heartbeat,
      commitOffsetsIfNecessary,
    }) => {
      /* logic to prevent concurrent event processing on the same partition. */

      for (const message of batch.messages) {
        let event: Event<any>;
        const partitionKey = message.key ? message.key.toString() : message.key;
        const { offset, timestamp } = message;
        const context: EventContext = {
          topic,
          partition,
          partitionKey,
          offset,
          timestamp,
        };
        
        try {
          // transform kafka message to custome event
          event = await this.transform(message);
        } catch (error) {
          try {
            resolveOffset(offset);
            await heartbeat();
            await commitOffsetsIfNecessary();
          } catch (commitError) {
                /* signal and await shutdown */
          }
          break;
        }
        
        try {
          /* process event with custom handler and retry logic */
          
      resolveOffset(offset);
          await commitOffsetsIfNecessary();
          await heartbeat();
        } catch (error) {
          
      }
      this.partitionTracker.stopProcessing(partitionId);
      await this.shutdownCheck();
    },
  });
};

环境

  • 操作系统:Mac OS 11.2.3
  • KafkaJS 版本:1.14.0
  • 平台:微软活动
  • 集线器 NodeJS 版本:12.x
4

0 回答 0