我创建了单个连接器以将集合的所有插入/更新事件推送到同一主题(1 个分区)。处理插入事件消息的消费者代码将比更新事件花费一些时间。
这里的问题是消息没有根据消费顺序提交。以下是Kafka消费者的步骤和行为。
- 在集合中插入一条记录并在一秒钟内更新该字段。
- 该主题现在有 2 条记录。一个用于插入事件(消息 1),另一个用于更新事件(消息 2)。
- 消息 1 被消费
- 消息 2 被消费
- 消息 2 已提交
- 消息 1 已提交
有什么办法可以等到消息 1 提交后再使用消息 2。
消费者守则:
const startConsumer = async () => {
// Creating a kafka consumer group.
const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs, topic_name);
kafkaConsumerGroup.on("connect", () => {
console.log("Consumer Group Connected Successfully");
});
// Listening for messages from kafka.
kafkaConsumerGroup.on("message", async (message) => {
//processor code
const data = JSON.parse(message.value);
response = await processData(JSON.parse(data.payload));
//Commit
kafkaConsumerGroup.commit((error, data) => {
if (error) {
console.log(error);
} else {
console.log('Commit success ');
}
});
});
kafkaConsumerGroup.on("error", (error) => {
console.log(error);
});
};