在接受了 Tommy Brunn 的回答一段时间后,我发现了一些错误,我最终得到了这个:
export const waitForKafkaMessages = async (
kafka: Kafka,
messagesAmount: number,
topic: string,
fromBeginning: boolean,
groupId: string,
): Promise<KafkaMessage[]> => {
const consumer: Consumer = kafka.consumer({ groupId })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning })
let resolveOnConsumption: (messages: KafkaMessage[]) => void
let rejectOnError: (e: Error) => void
const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
resolveOnConsumption = resolve
rejectOnError = reject
}).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below
const messages: KafkaMessage[] = []
await consumer.run({
autoCommit: false,
eachMessage: async ({ message, partition, topic }) => {
try {
// eachMessage is called by eachBatch which can consume more than messagesAmount.
// This is why we manually commit only messagesAmount messages.
if (messages.length < messagesAmount) {
messages.push(message)
// +1 because we need to commit the next assigned offset.
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
}
if (messages.length === messagesAmount) {
// I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
// This is why we close it in the promise's finally block
resolveOnConsumption(messages)
}
} catch (e) {
rejectOnError(e)
}
},
})
return returnThisPromise
}