我正在尝试使用 kafka-node 将我们的应用程序与 Kafka 集成。我从插件站点获取示例代码并尝试对其进行模块化。我在后台使用融合的 Kafka docker 图像。
示例代码:
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient(),
producer = new Producer(client);
producer.on('ready', function () {
});
producer.on('error', function (err) { })
exports.producer = producer
另一方面,使用以下代码访问此
var { producer } = require('./test');
function writeToQueue(text, sender, channel_id, type) {
message = new Message(text, sender, channel_id, type)
console.log(message)
payloads = [
{ topic: 'chat', messages: JSON.stringify(message), partition: 0 }
];
producer.send(payloads, function (err, data) {
console.log(data);
});
}
当消息被写入 Kafka 主题时,一切似乎都运行良好,唯一的问题是一段时间后,我收到以下错误
events.js:292
throw er; // Unhandled 'error' event
^
TimeoutError: Connection timeout of 10000ms exceeded
at new TimeoutError (G:\projects\nodejs\messaging\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
at Timeout.<anonymous> (G:\projects\nodejs\messaging\node_modules\kafka-node\lib\kafkaClient.js:253:13)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7)
Emitted 'error' event on Producer instance at:
at KafkaClient.<anonymous> (G:\projects\nodejs\messaging\node_modules\kafka-node\lib\baseProducer.js:101:10)
at KafkaClient.emit (events.js:315:20)
at G:\projects\nodejs\messaging\node_modules\kafka-node\lib\kafkaClient.js:165:16
at G:\projects\nodejs\messaging\node_modules\async\dist\async.js:3888:9
at G:\projects\nodejs\messaging\node_modules\async\dist\async.js:473:16
at iterateeCallback (G:\projects\nodejs\messaging\node_modules\async\dist\async.js:988:17)
at G:\projects\nodejs\messaging\node_modules\async\dist\async.js:969:16
at G:\projects\nodejs\messaging\node_modules\async\dist\async.js:3885:13
at G:\projects\nodejs\messaging\node_modules\kafka-node\lib\kafkaClient.js:207:9
at G:\projects\nodejs\messaging\node_modules\async\dist\async.js:969:16
PS G:\projects\nodejs\messaging>
我完全不知所措,因为在尝试解决这个问题时我对 nodejs 的了解已经完全耗尽(顺便说一句,这并不多)。