0

我创建了一个生产者和一个消费者,以使用“kafka-node”包发送和消费来自 Kafka 主题的消息。生产者和消费者通过 API 调用。POST 方法用于向主题发送消息,而 GET 方法用于从消费者的主题中获取消息。

当我在向 KAFKA 发送消息后调用消费者 API 时,之前的所有消息都在consumer.on('message', function(message){}).

我只需要从生产者发送的最后一条消息。

生产者.js

var kafka = require('kafka-node');
const Admin = require('kafka-node/lib/admin');
var Producer = kafka.Producer,
client = new kafka.KafkaClient(),
producer = new Producer(client);
producer.on('ready', function () {
    console.log('Producer is ready');
});

producer.on('error', function (err) {
    console.log('Producer is in error state');
    console.log(err);
})

const createProducer = async(req,res,next) => {
    var sentMessage = JSON.stringify(req.body.messages);
    payloads = [
        { topic: req.body.topic, messages: sentMessage, partition: 0}
    ];
    producer.createTopics(req.body.topic, function(err,data){
        console.log("createtopic error"+err);
    })
    producer.send(payloads, function( err, data) {
        console.log(payloads);
    })
    res.send({
        message: 'Successfully send data from producer',
        payloads: payloads
    })
}

module.exports = {createProducer}

消费者.js

var kafka = require('kafka-node');

const createConsumer = async(req,res,next) =>{
    var topic = req.params.topic;
    
    Consumer = kafka.Consumer,
    client = new kafka.KafkaClient(),
    // offset = new kafka.Offset(client),
    consumer = new Consumer(client,
        [{ topic: topic, offset: 0}],
        {
            autoCommit: false,

        }
    );
    consumer.on('message', function (message) {
        console.log('message : ',message.value);
    });

    res.send({
        message: "consumer created : ",
        topic: topic
    })
    consumer.on('error', function (err) {
        console.log('Error:',err);
    })
    
    consumer.on('offsetOutOfRange', function (err) {
        console.log("absbs");
        topic.maxNum = 2;
        offset.fetch([topic], function (err, offsets) {
            if (err) {
              return console.error(err);
            }
            var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
            consumer.setOffset(topic.topic, topic.partition, min);
        });
    })
}

module.exports = {createConsumer}

路由器.js

const express = require("express");
const kafkaConsumer = require("../kafka/consumer") // create routes for consumer
const kafkaproducer = require("../kafka/producer") // create routes for producer

router.post("/kafka/producer",kafkaproducer.createProducer);
router.get("/kafka/consumer/:topic",kafkaConsumer.createConsumer);

module.exports = router;

index.js

var express = require('express');
var app = express();
var kafka = require('kafka-node');
var router = require('./routes/routes');
const {
    PORT
  } = require("./config/config");

var bodyParser = require('body-parser');
app.use( bodyParser.json());
app.use(bodyParser.urlencoded({
  extended: true
}));

app.listen(PORT, function(){
    console.log('Kafka producer running at 5000');
});

app.use('/', router)

module.export = app;

如何在不使用任何数组或任何东西的情况下获取最后一条消息。

有什么办法可以删除主题吗?

谢谢,

4

2 回答 2

1

you have to set consumer offset latest which is last published message for given topic.

consumer = new Consumer(client,
    [{ topic: topic, offset: latest}],
    {
        autoCommit: false,

    }

to delete a topic here is the command

kafka-topics.sh --zookeeper <zk>:2181 --delete --topic <topic name>

Note: This will have no impact if delete.topic.enable is not set to true.

于 2021-12-16T00:37:55.470 回答
0

我对代码进行了一些更改,现在我可以从我的主题中检索最新消息。

我在 中创建了消费者offset.fetchLatestOffsets([topics],cb),并对消费者选项进行了一些更改。

请检查这个答案

于 2022-01-05T12:59:26.670 回答