1

我有一个带有 QueueTrigger 的 Azure Durable 函数。在内部,我有一些与子编排一起调用的 Azure 函数。这是代码

 var processKatartItem = context.GetInput<KatartItem>();

if (!processKatartItem.ProcessLeadTime && !processKatartItem.ProcessPrice && !processKatartItem.ProcessStock)
{
    await context.CallActivityWithRetryAsync("LLGFeedPreProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);
}

var tasks = new Task[processKatartItem.vardatas.Count];

for (var i = 0; i < processKatartItem.vardatas.Count; i++)
{
    tasks[i] = context.CallSubOrchestratorWithRetryAsync("LLGFeedProductOrchestrator", new RetryOptions(TimeSpan.FromSeconds(1), 5), new ProductProcessingModel
    {
        ProcessKatartItem = processKatartItem,
        ProcessVardata = processKatartItem.vardatas[i]
    });
}

await Task.WhenAll(tasks);
await context.CallActivityWithRetryAsync("LLGFeedFinalizeMessage", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);

这是 SubOrchestrator 的代码

var processModel = context.GetInput<ProductProcessingModel>();

await context.CallActivityWithRetryAsync("LLGFeedProductSpecificationProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMappingProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductPriceProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);

这种结构的方式是,我们将消息及其子输入处理到数据库中,然后运行 ​​MessageFinalization,通过调用我们网站上的 Api 将消息/产品索引到 Elastic Search 中。我们将大量消息排队,然后对其进行处理。

我遇到的问题是 Durable Function 执行前几个 Azure 函数,然后等待最后几个函数。这是 WebJobs 仪表板的屏幕截图

在此处输入图像描述

所以如您所见,已经处理了很多消息,但没有执行 LLGFeedFinalizeMessage 的 1 个实例。有没有办法可以确保我的整个管道得到执行,而不是先执行前几个函数,然后再执行所有函数。或者甚至是先进先出。那就是让编排器在启动新实例之前等待并完成其所有功能。

主机.json

{
    "version": "2.0",
    "extensions": {
      "durableTask": {
        "HubName": "LLGFeedTaskHub",
        "ControlQueueBatchSize": 8,
        "PartitionCount": 2,
        "MaxConcurrentActivityFunctions": 10,
        "MaxConcurrentOrchestratorFunctions": 2,
        "AzureStorageConnectionStringName": "AzureWebJobsStorage"
      },
      "queues": {
        "batchSize": 10,
        "newBatchThreshold": 5
      }
    },
    "functionTimeout": "00:10:00"
}
4

0 回答 0