3

我目前正在使用 WebJobs SDK 来使用队列中发布的消息。

我的方法作为具有 [Microsoft.Azure.WebJobs.QueueTrigger(...)] 属性的一个参数并且可以正常触发。在某些情况下,该方法可以处理消息,但有时,我希望它拒绝消息,直到关键资源可用为止。

在这种情况下,我尝试抛出异常,但与参考资料相反,队列触发器立即再次触发(显然没有等待租约时间)。

有没有办法优雅地推迟消息处理?只是冻结等待关键资源的线程是否安全?

任何提示将不胜感激。

4

3 回答 3

1

我认为您不能在当前版本中推迟消息。

可能的解决方法

您可以延迟重新添加相同的消息并避免重复,您可以将其设置MaxDequeueCount1,这将在异常发生后将失败消息直接发送到毒队列:

        JobHostConfiguration configuration = new JobHostConfiguration();
        configuration.Queues.MaxDequeueCount = 1;

和消息处理器 - 延迟重新添加您的消息并抛出异常:

    public static void ProcessMessage([QueueTrigger("resource-heavy-queue")] string message, [Queue("resource-heavy-queue")]  CloudQueue originalQueue)
    {
        if ( /*Resource unavaliable*/)
        {
            var messageToReAdd = new CloudQueueMessage(message);
            originalQueue.AddMessage(messageToReAdd, null, TimeSpan.FromSeconds(10));
            throw new ResourcesNotAvailableException();
        }
    }

通过这种方式,您可以为您的资源实施某种退避策略。不幸的是,您必须手动处理一些问题:

  • 处理有毒消息 - 如果您不断重新添加相同的消息,您可能会陷入无限循环,因此您必须扩展您的消息模型以NumberOfRetries在每次重新添加时携带和增加它
  • 每次重新添加后消息都会有所不同,Id因此您不能依赖它们。InsertionTime
于 2014-11-25T23:15:30.940 回答
0

抱歉,无法推迟触发。您想要的是 Azure WebJobs SDK 没有的多重触发器(当消息和资源可用时触发)

有一些解决方法。如果该队列中的所有消息都需要关键资源,则可以冻结线程- 您正在实现信号量 - 否则,调度会变得棘手,因为如果您达到并行运行的最大函数数,SDK 将不会处理新消息。

我会做的不是触发队列消息,而是触发关键资源。当资源可用时,它会将消息放入另一个队列中,然后才检查是否有任何需要处理的消息需要该资源。

于 2014-11-26T16:37:30.083 回答
0

在这种情况下,我要做的是ICollector为我触发的同一个队列设置一个

public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)

(该%QueueName%符号使 SDK 从 app.config 中获取值)

然后在处理程序中我正在做类似的事情

if (needToWait)
{
    var delayedMessage = new BrokeredMessage(originalMessageBody) { Label = originalLabel, MessageId = originalMessageId, ScheduledEnqueueTimeUtc = DateTime.UtcNow + this.ExecutionDelay };
    queue.Add(delayedMessage);
    return;
}

这样当前消息就完成了,但是一个具有相同属性的新消息被安排在定义的超时后传递。

在制作这个程序时没有线程被阻塞。

于 2018-05-03T08:05:33.517 回答