描述
大约一个月前,我们从使用 eachMessage 切换到 eachBatch,最近注意到我们服务的消费者没有处理来自随机分区的事件。生产者没有发现任何问题,并且在正常工作大约一个月后才开始出现问题,而我们的配置或使用方法没有任何更改。唯一发生的变化是在我们的基础架构方面,我们在事件中心启用了捕获;但是,尚不清楚这如何导致此问题。经过进一步调查,我们注意到偏移量以某种方式被破坏并且不再处于应有的位置。实际上,消费者停止阅读消息,因为似乎没有任何消息要阅读。我们将offset重置为latest,解决了一段时间的问题;然而,它在一两个小时后返回(再次随机)。值得一提的是,使用 eachMessage 恢复处理可以防止这种偏移量的损坏。
我在下面包含了配置和示例实现。非常感谢您对此的任何帮助。
配置
this.allowAutoTopicCreation = false;
this.autoCommit = false;
this.autoCommitThreshold = 1;
this.consumerRetryOptions = {
retries: 2,
factor: 2,
minTimeout: 2000,
maxTimeout: 10000,
randomize: true,
};
this.continueOnFailedEventProcessing = false;
this.eachBatchAutoResolve = false;
this.heartbeatInterval = undefined;
this.maxSleepCycles = 1;
this.partitionsConsumedConcurrently = 8;
this.recoveryPosition = false;
示例实现
public async consume () => {
await this.client.run({
eachBatchAutoResolve: false,
autoCommitThreshold: 1,
partitionsConsumedConcurrently: 8,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
}) => {
/* logic to prevent concurrent event processing on the same partition. */
for (const message of batch.messages) {
let event: Event<any>;
const partitionKey = message.key ? message.key.toString() : message.key;
const { offset, timestamp } = message;
const context: EventContext = {
topic,
partition,
partitionKey,
offset,
timestamp,
};
try {
// transform kafka message to custome event
event = await this.transform(message);
} catch (error) {
try {
resolveOffset(offset);
await heartbeat();
await commitOffsetsIfNecessary();
} catch (commitError) {
/* signal and await shutdown */
}
break;
}
try {
/* process event with custom handler and retry logic */
resolveOffset(offset);
await commitOffsetsIfNecessary();
await heartbeat();
} catch (error) {
}
this.partitionTracker.stopProcessing(partitionId);
await this.shutdownCheck();
},
});
};
环境:
- 操作系统:Mac OS 11.2.3
- KafkaJS 版本:1.14.0
- 平台:微软活动
- 集线器 NodeJS 版本:12.x