我有这个非常简单的代码,它应该只是使用来自官方 GitHub的本地主机中的 Kafka 将一条简单的消息从生产者传输到消费者
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
当我使用node index.js
(即此文件)执行此代码时,我得到了这两个错误中的许多:
{"level":"ERROR","timestamp":"2021-03-29T12:01:00.633Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka1","broker":"kafka1:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka1\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:60:26)"}
{"level":"ERROR","timestamp":"2021-03-29T12:01:00.635Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: getaddrinfo ENOTFOUND kafka1","retryCount":0,"retryTime":323}
所以看起来我猜在本地端口 9092 上定义代理的第一行不起作用?但我不知道如何纠正这一点。请问你能帮帮我吗?