我创建了一个生产者和一个消费者,以使用“kafka-node”包发送和消费来自 Kafka 主题的消息。生产者和消费者通过 API 调用。POST 方法用于向主题发送消息,而 GET 方法用于从消费者的主题中获取消息。
当我在向 KAFKA 发送消息后调用消费者 API 时,之前的所有消息都在consumer.on('message', function(message){})
.
我只需要从生产者发送的最后一条消息。
生产者.js
var kafka = require('kafka-node');
const Admin = require('kafka-node/lib/admin');
var Producer = kafka.Producer,
client = new kafka.KafkaClient(),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
const createProducer = async(req,res,next) => {
var sentMessage = JSON.stringify(req.body.messages);
payloads = [
{ topic: req.body.topic, messages: sentMessage, partition: 0}
];
producer.createTopics(req.body.topic, function(err,data){
console.log("createtopic error"+err);
})
producer.send(payloads, function( err, data) {
console.log(payloads);
})
res.send({
message: 'Successfully send data from producer',
payloads: payloads
})
}
module.exports = {createProducer}
消费者.js
var kafka = require('kafka-node');
const createConsumer = async(req,res,next) =>{
var topic = req.params.topic;
Consumer = kafka.Consumer,
client = new kafka.KafkaClient(),
// offset = new kafka.Offset(client),
consumer = new Consumer(client,
[{ topic: topic, offset: 0}],
{
autoCommit: false,
}
);
consumer.on('message', function (message) {
console.log('message : ',message.value);
});
res.send({
message: "consumer created : ",
topic: topic
})
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log("absbs");
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err);
}
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
})
}
module.exports = {createConsumer}
路由器.js
const express = require("express");
const kafkaConsumer = require("../kafka/consumer") // create routes for consumer
const kafkaproducer = require("../kafka/producer") // create routes for producer
router.post("/kafka/producer",kafkaproducer.createProducer);
router.get("/kafka/consumer/:topic",kafkaConsumer.createConsumer);
module.exports = router;
index.js
var express = require('express');
var app = express();
var kafka = require('kafka-node');
var router = require('./routes/routes');
const {
PORT
} = require("./config/config");
var bodyParser = require('body-parser');
app.use( bodyParser.json());
app.use(bodyParser.urlencoded({
extended: true
}));
app.listen(PORT, function(){
console.log('Kafka producer running at 5000');
});
app.use('/', router)
module.export = app;
如何在不使用任何数组或任何东西的情况下获取最后一条消息。
有什么办法可以删除主题吗?
谢谢,