我有一个带有 QueueTrigger 的 Azure Durable 函数。在内部,我有一些与子编排一起调用的 Azure 函数。这是代码
var processKatartItem = context.GetInput<KatartItem>();
if (!processKatartItem.ProcessLeadTime && !processKatartItem.ProcessPrice && !processKatartItem.ProcessStock)
{
await context.CallActivityWithRetryAsync("LLGFeedPreProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);
}
var tasks = new Task[processKatartItem.vardatas.Count];
for (var i = 0; i < processKatartItem.vardatas.Count; i++)
{
tasks[i] = context.CallSubOrchestratorWithRetryAsync("LLGFeedProductOrchestrator", new RetryOptions(TimeSpan.FromSeconds(1), 5), new ProductProcessingModel
{
ProcessKatartItem = processKatartItem,
ProcessVardata = processKatartItem.vardatas[i]
});
}
await Task.WhenAll(tasks);
await context.CallActivityWithRetryAsync("LLGFeedFinalizeMessage", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);
这是 SubOrchestrator 的代码
var processModel = context.GetInput<ProductProcessingModel>();
await context.CallActivityWithRetryAsync("LLGFeedProductSpecificationProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMappingProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductPriceProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
这种结构的方式是,我们将消息及其子输入处理到数据库中,然后运行 MessageFinalization,通过调用我们网站上的 Api 将消息/产品索引到 Elastic Search 中。我们将大量消息排队,然后对其进行处理。
我遇到的问题是 Durable Function 执行前几个 Azure 函数,然后等待最后几个函数。这是 WebJobs 仪表板的屏幕截图
所以如您所见,已经处理了很多消息,但没有执行 LLGFeedFinalizeMessage 的 1 个实例。有没有办法可以确保我的整个管道得到执行,而不是先执行前几个函数,然后再执行所有函数。或者甚至是先进先出。那就是让编排器在启动新实例之前等待并完成其所有功能。
主机.json
{
"version": "2.0",
"extensions": {
"durableTask": {
"HubName": "LLGFeedTaskHub",
"ControlQueueBatchSize": 8,
"PartitionCount": 2,
"MaxConcurrentActivityFunctions": 10,
"MaxConcurrentOrchestratorFunctions": 2,
"AzureStorageConnectionStringName": "AzureWebJobsStorage"
},
"queues": {
"batchSize": 10,
"newBatchThreshold": 5
}
},
"functionTimeout": "00:10:00"
}