我创建了一个 NodeJS 应用程序来将数据插入到 MongoDB 集合中。此数据库插入是通过使用 Kafka 完成的。Kafka-node 是我用来调用 Kafka 的插件。
我可以创建主题并向生产者级别的消费者发送消息。消息和主题取自 POST 请求。
这就是我所说的卡夫卡。参数是主题和消息。
每次我调用此 API 时,生产者都会创建一条新消息并将其发送给消费者。在每次调用中,之前的所有消息都会返回给消费者。
我已经使用fromOffset: 'earliest'
andfromOffset: 'latest'
选项来限制以前的消息,但不起作用。
谁能给我一个建议?卡夫卡节点的版本
"kafka-node": "^5.0.0",
我用过的代码
var kafka = require('kafka-node');
const {MongoClient} = require('mongodb');
var url = 'mongodb://127.0.0.1:27017/';
const mongoClient = new MongoClient(url);
var Producer = kafka.Producer,
client = new kafka.KafkaClient(),
offset = new kafka.Offset(client),
Consumer = kafka.Consumer,
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
const createProducer = async(req,res,next) => {
var topic = req.body.topic;
var sentMessage = JSON.stringify(req.body.messages);
producer.send(payloads, async function( err, data) {
})
client = new kafka.KafkaClient(),
consumer = new Consumer(client,
[
{ topic: topic, partition: 0 }
],
{
autoCommit: false,
fromOffset: 'earliest'
}
);
consumer.on('message', async function (message) {
console.log("Message : "+JSON.stringify(message))
try {
var currentdate = new Date();
var datetime = "Last Sync: " + currentdate.getDate() + "/"
+ (currentdate.getMonth()+1) + "/"
+ currentdate.getFullYear() + " @ "
+ currentdate.getHours() + ":"
+ currentdate.getMinutes() + ":"
+ currentdate.getSeconds();
var abb = await createListing(mongoClient,
{
topic: topic,
message: sentMessage,
time: datetime
}
);
} catch (e) {
console.error(":"+e);
}
finally {
}
});
await mongoClient.close();
res.send({
message: 'Successfully send data from producer',
payloads: payloads
})
async function createListing(client, newListing){
await mongoClient.connect();
const result = await
client.db("sample_airbnb").collection("listingsAndReviews").insertOne(newListing);
console.log(`New listing created with the following id: ${result.insertedId}`);
return result.insertedId;
}
}
谢谢,