2

我正在尝试使用 kafkajs 管理客户端获取我的 kafka 经纪人的主题元数据。我已经用 Node.js + express.js 编写了我的服务器。

这是我的index.js文件,它是 npm 的入口点。

'use strict';

const express = require('express');
const bodyParser = require('body-parser');
const Admin = require('./Admin/create-admin-client');
const TopicMetaData = require('./Admin/topic-metadata-fetch');

const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());

const adminConfig = new Admin({
    clientId: 'admin-client-4981',
    brokers: ['localhost:9092']
})

const admin = adminConfig.getAdmin();




.
..
...
// Handles the admin connection, disconnection, and other routes here
...
..
.




//This is where the error is
app.post('/api/v1/dev/admin/topicmetadata', (req, res) => {
    const topic = req.body;
    const topicmetadata = new TopicMetaData(admin);

    topicmetadata.setTopicConfig(topic);
    topicmetadata.commit(req, res);
});

app.listen(4040);

这是一个create-admin-client.js文件,它检索管理对象。

'use strict';

const { Kafka } = require('kafkajs');

class Admin {
    constructor(kafkaConfig) {
         this.kafka = new Kafka({
            clientId: kafkaConfig.clientId,
            brokers: kafkaConfig.brokers
        });
    }

    getAdmin() {
        return this.kafka.admin();
    };
}

module.exports = Admin;

这是topics-metadata-fetch.js获取主题元数据的文件。

'use strict';

class TopicMetaData {

    constructor(admin) {
        this.admin = admin;
    }

    setTopicConfig(topicConfig) {
        this.topic = topicConfig;
    }

    commit(req, res) {
        this.admin.fetchTopicMetadata({
            topics: this.topic
        })
        .then((topics) => {
            console.log("Topic MetaData Fetched Successfully!");
            res.status(200).send({
                topics
            });
        })
        .catch((err) => {
            console.error(err);
            res.status(500).send(err);
        })
    }
}

module.exports = TopicMetaData;

每当我发送 POST 请求以获取主题的元数据时(例如“SERVICE-TYPES”,我已成功创建主题),使用req.bodyas

{
    "topic": "SERVICE-TYPES",
    "partitions": [{
        "partitionErrorCode": 0,
        "partitionId": 0,
        "leader": 0,
        "replicas": [0],
        "isr": [0]
    }]
}

它返回一个TypeError: topics.forEach is not a function错误。我哪里做错了?

4

1 回答 1

2

我发现我的 http 请求正文应该具有以下结构:

{
    "topics": [{
        "topic": "SERVICE_TYPES",
        "partitions": [{
            "partitionErrorCode": 0,
            "partitionId": 0,
            "leader": 0,
            "replicas": [0],
            "isr": [0]
        }]
    }]
}

我的路由处理程序应该是这样的:

app.get('/api/v1/dev/admin/topicmetadata', (req, res) => {
        //topic should be array of the topics
        const topic = req.body.topics;
        const topicmetadata = new TopicMetaData(admin);

        topicmetadata.setTopicConfig(topic);
        topicmetadata.commit(req, res);
});

所以我传递了错误的请求正文结构。

于 2020-02-10T15:52:29.367 回答