4

我正在尝试实现 Azure Durable Function 工作流。

每隔 6 分钟,我就有一个 Azure TimerTrigger 函数调用一个 Azure 编排函数 (OrchestrationTrigger),该函数又启动许多活动函数 (ActivityTrigger)。

然而,有时,Orchestration 函数会在几秒钟内被调用两次!这是一个大问题,因为我的活动函数不是幂等的!

下面是我的代码是如何被调用的。

定时器触发功能:

[FunctionName("StartupFunc")]
public static async Task Run([TimerTrigger("0 */6 * * * *", RunOnStartup = true, UseMonitor = false)]TimerInfo myStartTimer, [OrchestrationClient] DurableOrchestrationClient orchestrationClient, TraceWriter log)
{
    List<OrchestrationModel> ExportModels = await getData();

    string id = await orchestrationClient.StartNewAsync("OrchestratorFunc", ExportModels);
}

编排功能:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    {
        tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
    }
    await Task.WhenAll(tasks);
}

活动功能:

[FunctionName("TransformToSql")]
[public static async Task<string> RunTransformation([ActivityTrigger] DurableActivityContext context, TraceWriter log)
{
    TransformModel = context.GetInput<TransformModel>();

    //Do some work with TransformModel
}
4

2 回答 2

7

这种行为非常好——这就是 Durable Functions 的设计原理。

您有以下编排:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    {
        tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
    }
    await Task.WhenAll(tasks);
}

当一个 Activity 被调用时,流程返回到一个称为Dispatcher的概念——它是 Durable Functions 的内部存在,负责维护您的编排流程。在等待任务完成之前,编排会被临时解除分配。一旦任务完成,整个编排将重播,直到下一次await发生。

重要的是,尽管重放了编排,但不会再次调用活动 - 它的结果是从存储中获取并使用的。使事情更明确,请考虑以下示例:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    {
        await context.CallActivityAsync<string>("TransformToSql1", new TransformModel(data);
        await context.CallActivityAsync<string>("TransformToSql2", new TransformModel(data);
    }
}

TransformToSql1等待时,编排被解除分配并且整个流程等待直到该活动完成。然后重放编排——它再次等待TransformToSql1,但由于它的结果被保存,它只是回到编排并等待TransformToSql2——然后重复该过程。

于 2018-09-12T07:10:33.137 回答
4

随着 Durable Function 框架的重放,编排功能将更频繁地运行。你看到你的活动函数也被再次触发了吗?如果是这样,那么这确实是一种奇怪的行为。

Durable Functions 使用存储队列和表来控制编排的流程和捕获状态。

每次触发编排功能时(通过从控制队列接收消息),它将重播编排。您可以使用IsReplaying.DurableOrchestrationContext

if (!context.IsReplaying)
{
   // this code will only run during the first execution of the workflow
}

不要使用IsReplaying来确定运行活动,而是将其用于记录/调试目的。

当达到活动功能时,会将一条消息放入工作项队列。然后,Durable Functions 框架将查看历史表以确定此活动函数之前是否已经运行过(使用给定的参数)。如果是,则它不会再次运行活动功能,它将继续执行其余的编排功能。

Durable Functions 的检查点和重放功能仅在编排代码具有确定性时才有效。因此,切勿使用基于 DateTime 或随机数的决策。

更多信息:https ://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay

于 2018-09-12T07:38:01.370 回答