我们正在使用@azure/service-bus包来处理来自多个主题的消息批次。
我们用来每 2 秒从主题中获取 20 条消息的代码如下所示。
let isProcessing: boolean = false;
setInterval(async () => {
if (isProcessing === false) {
isProcessing = true;
try {
const messages: Array<ServiceBusMessage>
= await receiver.receiveMessages(Configuration.SB.batchSize as number);
if (messages.length > 0) {
this.logger.info(`[SB] ${topic} - ${messages.length} require processing`);
await Promise.all([
...messages.map(message => this.handleMsg(receiver, message, topic, moduleRef, handler))
]).catch(error => {
this.logger.error(error.message, error);
});
}
isProcessing = false;
} catch (error) {
this.logger.error(error.message, error);
isProcessing = false;
}
}
}, Configuration.SB.tickInterval as number);
我的问题是 - 这是最好的方法吗?有没有更好的办法?它可以工作并且性能相当好,但我认为我们有时会丢失 receiveAndDelete 消息,如果我们的实现,我正在尝试锻炼
谢谢你的帮助