0

我创建了一个 NodeJS 应用程序来将数据插入到 MongoDB 集合中。此数据库插入是通过使用 Kafka 完成的。Kafka-node 是我用来调用 Kafka 的插件。

我可以创建主题并向生产者级别的消费者发送消息。消息和主题取自 POST 请求。
这就是我所说的卡夫卡。参数是主题和消息。

每次我调用此 API 时,生产者都会创建一条新消息并将其发送给消费者。在每次调用中,之前的所有消息都会返回给消费者。

我已经使用fromOffset: 'earliest'andfromOffset: 'latest'选项来限制以前的消息,但不起作用。

谁能给我一个建议?卡夫卡节点的版本

"kafka-node": "^5.0.0",

我用过的代码

var kafka = require('kafka-node');
const {MongoClient} = require('mongodb');
var url = 'mongodb://127.0.0.1:27017/';
const mongoClient = new MongoClient(url);
var Producer = kafka.Producer,
    client = new kafka.KafkaClient(),
    offset = new kafka.Offset(client),
    Consumer = kafka.Consumer,
    producer = new Producer(client);
producer.on('ready', function () {
        console.log('Producer is ready');
    });
producer.on('error', function (err) {
        console.log('Producer is in error state');
        console.log(err);
    })
const createProducer = async(req,res,next) => {
  var topic = req.body.topic;
  var sentMessage = JSON.stringify(req.body.messages);
  producer.send(payloads, async function( err, data) {
        })
  client = new kafka.KafkaClient(),
  consumer = new Consumer(client,
            [
                { topic: topic, partition: 0 }
            ],
            {
                autoCommit: false,
                fromOffset: 'earliest'
            }
        );
   consumer.on('message', async function (message) {            

      console.log("Message : "+JSON.stringify(message))
      try {            
        var currentdate = new Date(); 
        var datetime = "Last Sync: " + currentdate.getDate() + "/"
            + (currentdate.getMonth()+1)  + "/" 
            + currentdate.getFullYear() + " @ "  
            + currentdate.getHours() + ":"  
            + currentdate.getMinutes() + ":" 
            + currentdate.getSeconds();
        var abb = await createListing(mongoClient,
            {
              topic: topic,
              message: sentMessage,
              time: datetime
             }
             );
             
      } catch (e) {
         console.error(":"+e);
      }
       finally {
                
       }
    });
     await mongoClient.close();
        res.send({
            message: 'Successfully send data from producer',
            payloads: payloads
        })

    async function createListing(client, newListing){
        await mongoClient.connect();
        const result = await 
       
      client.db("sample_airbnb").collection("listingsAndReviews").insertOne(newListing);
        console.log(`New listing created with the following id: ${result.insertedId}`);
        return result.insertedId;
      }
}

消费者消费的消息是

谢谢,

4

2 回答 2

0

您的消费者将始终使用其消费者组之前未标记为已消费的所有偏移量。

这意味着在消费给定消息(或一批消息)之后,您需要将最高消费偏移量提交给您的 Kafka 集群,以有效地将这些消息标记为已消费。只有这样,您的消费者组才不会在启动时重新使用这些消息。

要提交偏移量,您可以使用 kafka.js 的autoCommit功能(您在实现中明确禁用),或者使用 kafka.js 提供的 API 手动提交偏移量。

您可以在此处找到两者的文档:https ://kafka.js.org/docs/sumption#a-name-auto-commit-a-autocommit

于 2022-01-03T12:21:39.110 回答
0

我对代码进行了一些更改,现在我可以从我的主题中检索最新消息。

我在 中创建了消费者offset.fetchLatestOffsets([topics],cb),并对消费者选项进行了一些更改。

var payloads = [
    { topic: topicName, messages: messageTotopic, partition: 0}
];
producer.send(payloads, async function(err, data) {

});
var client = new kafka.KafkaClient();
offset.fetchLatestOffsets([topic], async function (error, offsets) {
    if (error)
        console.log(error);
    offsetA = JSON.stringify(offsets[topic][0])
    console.log('offset Value:: '+offsetA);
    var consumer = new Consumer(
        client,
        [
            {
                topic: topic,
                partition: 0,
                offset: offsetA-1, // Offset value starts from 0
            }
        ], {
            autoCommit: false,
            fromOffset: true,
        }
    );
    consumer.on('message', async function (message) {
        console.log("Message from last offset:: " + JSON.stringify(message)); // will return the latest message.
        consumer.close();
    });
});

使用这种方式,我能够克服与KafkaClient.

于 2022-01-05T06:35:13.330 回答