0

我们正在使用@azure/service-bus包来处理来自多个主题的消息批次。

我们用来每 2 秒从主题中获取 20 条消息的代码如下所示。

       let isProcessing: boolean = false;

       setInterval(async () => {

        if (isProcessing === false) {

            isProcessing = true;

            try {

                const messages: Array<ServiceBusMessage>
                    = await receiver.receiveMessages(Configuration.SB.batchSize as number);

                if (messages.length > 0) {

                    this.logger.info(`[SB] ${topic} - ${messages.length} require processing`);

                    await Promise.all([
                        ...messages.map(message => this.handleMsg(receiver, message, topic, moduleRef, handler))
                    ]).catch(error => {
                        this.logger.error(error.message, error);
                    });

                }

                isProcessing = false;

            } catch (error) {

                this.logger.error(error.message, error);

                isProcessing = false;

            }

        }

    }, Configuration.SB.tickInterval as number);

我的问题是 - 这是最好的方法吗?有没有更好的办法?它可以工作并且性能相当好,但我认为我们有时会丢失 receiveAndDelete 消息,如果我们的实现,我正在尝试锻炼

谢谢你的帮助

4

1 回答 1

1

它可以工作并且性能相当好,但我认为我们有时会丢失 receiveAndDelete 消息,如果我们的实现,我正在尝试锻炼

有两种接收消息的模式

  • 不安全的ReceiveAndDelete
  • 安全PeekLock

使用ReceiveAndDelete模式时,客户端收到消息的那一刻,它们会自动从服务器中删除。所以这是最多一次交付。

一条消息被“租用”给客户端最多 5 分钟,客户端必须通过PeekLock请求消息完成或如果无法处理它通过取消/死信来确认成功处理。如果这些操作都没有在定义的租用时间内发生(不必严格限制为 5 分钟,也可以更短),则重试消息,直到MaxDeliveryCount超过最大传递尝试次数 ( ) 并且消息失效-字母。请注意,消息永远不会丢失。即使它未能处理并且是死信。因此,这是至少一次交付,可能更适合您的场景。它将对您编写客户端的方式产生轻微影响,但不会产生巨大的变化。

于 2019-11-19T17:31:12.473 回答