我开发了一个触发新服务总线队列消息的 Azure Durable Functions 应用程序。当没有发生错误时它可以正常工作,但是当活动函数中发生错误时,它会记录它失败但消息从队列中永远消失了。可能是什么原因造成的,如何防止消息在出错时从队列中消失?
这是可复现的代码,是VS2017中新的Azure Function模板生成的代码,只是城市为“西雅图”时添加了一个例外,它是ServicebusTrigger而不是HttpTrigger。
[FunctionName("Test")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
[FunctionName("Test_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"Saying hello to {name}.");
if (name == "Seattle")
throw new Exception("An error occurs");
return $"Hello {name}!";
}
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
更新:当我在编排客户端功能中出现异常时,如果重试失败 x 次,它会执行正确的操作,例如重试并将消息放入死信队列。
所以我设法通过使用这个while循环更新客户端函数来解决这个问题,检查失败/终止/取消状态。
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
var status = await starter.GetStatusAsync(instanceId);
while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
{
System.Threading.Thread.Sleep(1000);
status = await starter.GetStatusAsync(instanceId);
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
{
throw new Exception("Orchestration failed with error: " + status.Output);
}
}
}
然而,这对我来说似乎是一个 hack,而且我还没有在任何 MS 示例代码中看到这种类型的代码。我想这应该由持久功能框架来处理。是否有另一种方法可以使服务总线触发器在持久功能中工作?