0

我创建了单个连接器以将集合的所有插入/更新事件推送到同一主题(1 个分区)。处理插入事件消息的消费者代码将比更新事件花费一些时间。

这里的问题是消息没有根据消费顺序提交。以下是Kafka消费者的步骤和行为。

  1. 在集合中插入一条记录并在一秒钟内更新该字段。
  2. 该主题现在有 2 条记录。一个用于插入事件(消息 1),另一个用于更新事件(消息 2)。
  3. 消息 1 被消费
  4. 消息 2 被消费
  5. 消息 2 已提交
  6. 消息 1 已提交

有什么办法可以等到消息 1 提交后再使用消息 2。

消费者守则:

    const startConsumer = async () => {
    
        // Creating a kafka consumer group.
        const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs, topic_name);
    
        kafkaConsumerGroup.on("connect", () => {
            console.log("Consumer Group Connected Successfully");
        });
    
        // Listening for messages from kafka.
        kafkaConsumerGroup.on("message", async (message) => {
    
                //processor code
                const data = JSON.parse(message.value);
                response = await processData(JSON.parse(data.payload));
    
                //Commit
                kafkaConsumerGroup.commit((error, data) => {
                    if (error) {
                        console.log(error);
                    } else {
                        console.log('Commit success ');
                    }
                });
        });
    
        kafkaConsumerGroup.on("error", (error) => {
            console.log(error);
        });
    };
4

0 回答 0