1

我有这个非常简单的代码,它应该只是使用来自官方 GitHub的本地主机中的 Kafka 将一条简单的消息从生产者传输到消费者

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092']
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

当我使用node index.js(即此文件)执行此代码时,我得到了这两个错误中的许多:

{"level":"ERROR","timestamp":"2021-03-29T12:01:00.633Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka1","broker":"kafka1:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka1\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:60:26)"}

{"level":"ERROR","timestamp":"2021-03-29T12:01:00.635Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: getaddrinfo ENOTFOUND kafka1","retryCount":0,"retryTime":323}

所以看起来我猜在本地端口 9092 上定义代理的第一行不起作用?但我不知道如何纠正这一点。请问你能帮帮我吗?

4

1 回答 1

0

感谢 OneCricketeer,我发现我没有正确启动本地 kafka 服务器,因此第一行无法正常工作。

非常感谢他!

于 2021-03-29T14:34:52.137 回答