4

我开发了一个触发新服务总线队列消息的 Azure Durable Functions 应用程序。当没有发生错误时它可以正常工作,但是当活动函数中发生错误时,它会记录它失败但消息从队列中永远消失了。可能是什么原因造成的,如何防止消息在出错时从队列中消失?

这是可复现的代码,是VS2017中新的Azure Function模板生成的代码,只是城市为“西雅图”时添加了一个例外,它是ServicebusTrigger而不是HttpTrigger。

            [FunctionName("Test")]
    public static async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] DurableOrchestrationContext context)
    {
        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }

    [FunctionName("Test_Hello")]
    public static string SayHello([ActivityTrigger] string name, ILogger log)
    {
        log.LogInformation($"Saying hello to {name}.");
        if (name == "Seattle")
            throw new Exception("An error occurs");
        return $"Hello {name}!";
    }

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    }

更新:当我在编排客户端功能中出现异常时,如果重试失败 x 次,它会执行正确的操作,例如重试并将消息放入死信队列。

所以我设法通过使用这个while循环更新客户端函数来解决这个问题,检查失败/终止/取消状态。

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

        var status = await starter.GetStatusAsync(instanceId);

        while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
        {
            System.Threading.Thread.Sleep(1000);
            status = await starter.GetStatusAsync(instanceId);
            if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed 
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
            {
                throw new Exception("Orchestration failed with error: " + status.Output);
            }
        }

    }

然而,这对我来说似乎是一个 hack,而且我还没有在任何 MS 示例代码中看到这种类型的代码。我想这应该由持久功能框架来处理。是否有另一种方法可以使服务总线触发器在持久功能中工作?

4

2 回答 2

3

此行为是设计使然。启动编排是异步的 - 即StartNewAsyncAPI 不会自动等待编排运行或完成。在内部,StartNewAsync只需将消息放入 Azure 存储队列并将条目写入 Azure 存储表。如果这成功发生,那么您的服务总线功能将继续运行并成功完成,此时消息将被删除。

如果您确实需要服务总线队列消息重试,您的解决方法是可以接受的,但我质疑您为什么需要这样做。编排本身可以在不依赖服务总线的情况下管理自己的重试。例如,您可以使用CallActivityWithRetryAsync在编排内部重试。

请参阅 Durable Functions 文档的错误处理主题。

于 2019-01-06T19:38:09.503 回答
1

我知道这是一个旧线程,但我想分享我是如何使用ServiceBusTriggerand来实现它的WaitForCompletionOrCreateCheckStatusResponseAsync

[FunctionName(nameof(QueueTriggerFunction))]
public async Task QueueTriggerFunction(
    [ServiceBusTrigger("queue-name", Connection = "connectionstring-key")]string queueMessage,
    MessageReceiver messageReceiver,
    string lockToken,
    string messageId,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    //note: autocomplete is disabled
    try
    {
        //start durable function
        var instanceId = await starter.StartNewAsync(nameof(OrchestratorFunction), queueMessage);

        //get the payload (we want to use the status uri)
        var payload = starter.CreateHttpManagementPayload(instanceId);

        //instruct QueueTriggerFunction to wait for response
        await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(new HttpRequestMessage(HttpMethod.Get, payload.StatusQueryGetUri), instanceId);

        //response ready, get status
        var status = await starter.GetStatusAsync(instanceId);

        //act on status
        if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
        {
            //like completing the message
            await messageReceiver.CompleteAsync(lockToken);
            log.LogInformation($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} succeeded [MessageId={messageId}]");
        }
        else
        {
            //or deadletter the sob
            await messageReceiver.DeadLetterAsync(lockToken);
            log.LogError($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} failed [MessageId={messageId}]");
        }
    }
    catch (Exception ex)
    {
        //not sure what went wrong, let the lock expire and try again (until max retry attempts is reached)
        log.LogError(ex, $"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: handler failed [MessageId={messageId}]");
    }
}

问题是,互联网上的所有示例都使用HttpTrigger并使用该触发器的 httprequest 来检查是否完成,但ServiceBusTrigger. 此外,我认为这不正确,您应该使用有效负载调用中的状态 uri,就像我在这里使用协调器函数的 instanceId 所做的那样。

于 2021-01-17T14:29:27.587 回答