1

考虑这个测试,其中一条消息从测试发送到主题“out”,并且测试的代码预计会使用它并通过向主题“in”发送消息来回复。为了通过,我想确保将消息发送到主题“in”。

it('...', async () => {
  /* initialize kafkaConsumer and kafkaProducer here */

  async function someCallback() {
    // ...
  }

  await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
  await kafkaConsumer.run({ eachMessage: someCallback })

  await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })

  // How do I block here until someCallback is called?
})

我阅读了有关使用的信息done,但在定义测试本身时我不能拥有它async,我需要它才能使用await。有没有我不知道的不同方式?

4

2 回答 2

3

你可以看看我们如何测试 KafkaJS 本身以获得一些灵感。例如,这是一个基本的消费者测试

我们真的没有做任何花哨的事情,只是从回调中将消息添加到数组中eachMessage,然后等待一个定期检查我们是否达到预期消息数量的 Promise。像这样的东西:

it('consumes messages', async () => {
  const messages = [{ value: 'hello world' }]
  const consumedMessages = []

  consumer.run({
    eachMessage: ({ message }) => {
      consumedMessages.push(message);
    }
  })

  await producer.send({ topic, messages })

  await waitFor(() => consumedMessages.length === messages.length)
})

WherewaitFor本质上是一个函数,它返回一个 Promise 并启动一个 setTimeout 来检查谓词并在谓词为真时解析该 Promise(或者如果它遇到超时则拒绝)。

要记住的一些问题:

  • 每次运行都使用一个新groupId的,这样多次运行就不会相互干扰。
  • 出于同样的原因,在每次测试运行时使用一个新主题。
  • 如果您在消费者加入群组并订阅主题之前生成消息,则默认情况下不会显示这些消息。在生产之前订阅fromBeginning: true或等待您的消费者订阅并加入组(检测事件会在组加入时发出一个事件,您可以以与我们等待消息被消费相同的方式等待)。
于 2021-01-22T10:25:39.813 回答
1

在接受了 Tommy Brunn 的回答一段时间后,我发现了一些错误,我最终得到了这个:

export const waitForKafkaMessages = async (
  kafka: Kafka,
  messagesAmount: number,
  topic: string,
  fromBeginning: boolean,
  groupId: string,
): Promise<KafkaMessage[]> => {
  const consumer: Consumer = kafka.consumer({ groupId })
  await consumer.connect()
  await consumer.subscribe({ topic, fromBeginning })

  let resolveOnConsumption: (messages: KafkaMessage[]) => void
  let rejectOnError: (e: Error) => void

  const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
    resolveOnConsumption = resolve
    rejectOnError = reject
  }).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below

  const messages: KafkaMessage[] = []
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ message, partition, topic }) => {
      try {
        // eachMessage is called by eachBatch which can consume more than messagesAmount.
        // This is why we manually commit only messagesAmount messages.
        if (messages.length < messagesAmount) {
          messages.push(message)

          // +1 because we need to commit the next assigned offset.
          await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
        }

        if (messages.length === messagesAmount) {
          // I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
          // This is why we close it in the promise's finally block

          resolveOnConsumption(messages)
        }
      } catch (e) {
        rejectOnError(e)
      }
    },
  })

  return returnThisPromise
}
于 2021-04-02T11:15:17.197 回答