2

我有一个运行长时间运行任务的队列触发器。遇到锁定持续时间和更新的各种问题,所以我决定在整个过程的一开始就手动完成消息。如果作业遇到错误,则将消息发送回队列。

在下面的日志数据中,您可以看到它收到消息,完成它,启动任务,记录一个lock supplied is invalid似乎没有任何影响的错误,然后完成任务。我想停止这个错误。这很烦人。

[FunctionName("QueueTrigger")]
public async Task RunQueue([ServiceBusTrigger("queuename", Connection = "cn")] Message message, MessageReceiver messageReceiver, [ServiceBus("queuename", Connection = "cn")] IAsyncCollector<Message> queue)
{
    var msg = JsonConvert.DeserializeObject<RequestObj>(Encoding.UTF8.GetString(message.Body));
    logger.LogInfo("*** Before complete");
    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
    logger.LogInfo("*** After complete");
    try
    {
        if(msg.Whatever > 1)   throw new Exception("Bogus exception thrown for testing.");

        var result = await RunTheTask(msg);
        if (!result.IsSuccessful)
        {
            logger.LogError(result.Failure);
            throw new ApplicationException(result.Failure);
        };
    }
    catch (Exception ex)
    {
        logger.LogError(ex.Message);
        logger.LogInfo("*** About to requeue");
        await queue.AddAsync(new Message(message.Body));
        logger.LogSuccess("*** Queued the message again.");
    }
}

host.json文件有serviceBus这样的配置(autoComplete关闭)

"serviceBus": {
      "prefetchCount": 1,
      "messageHandlerOptions": {
        "autoComplete": false,
        "maxConcurrentCalls": 1
      },
      "SessionHandlerOptions": {
        "autoComplete": false
      },
      "BatchOptions": {
        "AutoComplete": false
      }

在这两种情况下,代码都按预期工作,但在这两种情况下,我总是看到“无效锁定”错误

一些日志内容:

[2021-02-01T20:46:27.248Z] Executing 'QueueTrigger' (Reason='(null)', Id=520a670c-e636-49b8-96b5-7502f6b479ac)
[2021-02-01T20:46:27.251Z] Trigger Details: MessageId: d7af56ed083947e99ce0fc468f111e4d, SequenceNumber: 37, DeliveryCount: 1, EnqueuedTimeUtc: 2021-02-01T20:40:44.5390000Z, LockedUntilUtc: 2021-02-01T20:41:44.5550000Z, SessionId: (null)
[2021-02-01T20:46:28.331Z] *** Before complete
[2021-02-01T20:46:28.471Z] *** After complete
[2021-02-01T20:46:28.498Z] Running the Task
[2021-02-01T20:46:28.826Z] Message processing error (Action=RenewLock, ClientId=whatever, EntityPath=queueName, Endpoint=blah.servicebus.windows.net)
[2021-02-01T20:46:28.830Z] Microsoft.Azure.ServiceBus: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:30780d70-2887-4923-8725-329ab94dd4b2, TrackingId:e3e85f42-1c62-44da-b858-1c856f972a1f_B14, SystemTracker:x:Queue:queueName, Timestamp:2021-02-01T20:40:46.
[2021-02-01T20:46:38.855Z] Finished the Task
[2021-02-01T20:46:42.221Z] Executed 'QueueTrigger' (Succeeded, Id=520a670c-e636-49b8-96b5-7502f6b479ac, Duration=14992ms)
4

2 回答 2

0

我认为你应该将'complete'方法移到最后,该功能会自动刷新锁过期时间以保持它在正常情况下可用。但是一旦你使用了'complete'方法,函数会生成最后一个锁,如果你的后续逻辑超过了锁的时间,就会抛出锁过期异常。大家可以看看下面的代码,比较一下使用完整和不使用完整的区别:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace FunctionApp71
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task Run([ServiceBusTrigger("queuename", Connection = "cn")] Message message, MessageReceiver messageReceiver, [ServiceBus("queuename", Connection = "cn")] IAsyncCollector<Message> queue, ILogger logger)
        {
            logger.LogInformation("When the message comes in, the expire time is: "+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is 2/4/2021 8:29:00 AM

            Thread.Sleep(6000);

            logger.LogInformation("After 6 seconds, the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//The time of the lock is renewed, so now the time is 2/4/2021 8:29:04 AM

            //When you complete the message, there is no way to renew the lock because the lock is not exist(Azure function stop renew the lock.).
            await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
            logger.LogInformation("After complete the message, the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is still 2/4/2021 8:29:04 AM

            Thread.Sleep(6 * 1000);//Your try catch is here.

            logger.LogInformation("After complete message and 6 seconds later, the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());
        }
    }
}

在此处输入图像描述

于 2021-02-03T02:59:38.893 回答
0

记录了一个问题:https ://github.com/Azure/azure-functions-servicebus-extension/issues/137 问题有一个链接到 repo 的 repro。

于 2021-02-26T22:21:08.980 回答