我目前正在使用 WebJobs SDK 来使用队列中发布的消息。
我的方法作为具有 [Microsoft.Azure.WebJobs.QueueTrigger(...)] 属性的一个参数并且可以正常触发。在某些情况下,该方法可以处理消息,但有时,我希望它拒绝消息,直到关键资源可用为止。
在这种情况下,我尝试抛出异常,但与参考资料相反,队列触发器立即再次触发(显然没有等待租约时间)。
有没有办法优雅地推迟消息处理?只是冻结等待关键资源的线程是否安全?
任何提示将不胜感激。
我目前正在使用 WebJobs SDK 来使用队列中发布的消息。
我的方法作为具有 [Microsoft.Azure.WebJobs.QueueTrigger(...)] 属性的一个参数并且可以正常触发。在某些情况下,该方法可以处理消息,但有时,我希望它拒绝消息,直到关键资源可用为止。
在这种情况下,我尝试抛出异常,但与参考资料相反,队列触发器立即再次触发(显然没有等待租约时间)。
有没有办法优雅地推迟消息处理?只是冻结等待关键资源的线程是否安全?
任何提示将不胜感激。
我认为您不能在当前版本中推迟消息。
可能的解决方法
您可以延迟重新添加相同的消息并避免重复,您可以将其设置MaxDequeueCount
为1
,这将在异常发生后将失败消息直接发送到毒队列:
JobHostConfiguration configuration = new JobHostConfiguration();
configuration.Queues.MaxDequeueCount = 1;
和消息处理器 - 延迟重新添加您的消息并抛出异常:
public static void ProcessMessage([QueueTrigger("resource-heavy-queue")] string message, [Queue("resource-heavy-queue")] CloudQueue originalQueue)
{
if ( /*Resource unavaliable*/)
{
var messageToReAdd = new CloudQueueMessage(message);
originalQueue.AddMessage(messageToReAdd, null, TimeSpan.FromSeconds(10));
throw new ResourcesNotAvailableException();
}
}
通过这种方式,您可以为您的资源实施某种退避策略。不幸的是,您必须手动处理一些问题:
NumberOfRetries
在每次重新添加时携带和增加它Id
因此您不能依赖它们。InsertionTime
抱歉,无法推迟触发。您想要的是 Azure WebJobs SDK 没有的多重触发器(当消息和资源可用时触发)
有一些解决方法。如果该队列中的所有消息都需要关键资源,则可以冻结线程- 您正在实现信号量 - 否则,调度会变得棘手,因为如果您达到并行运行的最大函数数,SDK 将不会处理新消息。
我会做的不是触发队列消息,而是触发关键资源。当资源可用时,它会将消息放入另一个队列中,然后才检查是否有任何需要处理的消息需要该资源。
在这种情况下,我要做的是ICollector
为我触发的同一个队列设置一个
public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
(该%QueueName%
符号使 SDK 从 app.config 中获取值)
然后在处理程序中我正在做类似的事情
if (needToWait)
{
var delayedMessage = new BrokeredMessage(originalMessageBody) { Label = originalLabel, MessageId = originalMessageId, ScheduledEnqueueTimeUtc = DateTime.UtcNow + this.ExecutionDelay };
queue.Add(delayedMessage);
return;
}
这样当前消息就完成了,但是一个具有相同属性的新消息被安排在定义的超时后传递。
在制作这个程序时没有线程被阻塞。