6

I have an existing Function App with 2 Functions and a storage queue. F1 is triggered by a message in a service bus topic. For each msg received, F1 calculates some sub-tasks (T1,T2,...) which have to be executed with varying amount of delay. Ex - T1 to be fired after 3 mins, T2 after 5min etc. F1 posts messages to a storage queue with appropriate visibility timeouts (to simulate the delay) and F2 is triggered whenever a message is visible in the queue. All works fine.

I now want to migrate this app to use 'Durable Functions'. F1 now only starts the orchestrator. The orchestrator code is something as follows -

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            var pnTask = context.CallActivityAsync("PerformSubTask", value);
            tasks.Add(pnTask);
        }

        //dont't await as we want to fire and forget. No fan-in!
        //await Task.WhenAll(tasks);
    }

    [FunctionName("PerformSubTask")]
    public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
    {
         TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
         TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
         var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

         //will still keep the activity function running and incur costs??
         await Task.Delay(actualDelay);
   
         //perform subtask work after delay! 
    }

I would only like to fan-out (no fan-in to collect the results) and start the subtasks. The orchestrator starts all the tasks and avoids call 'await Task.WhenAll'. The activity function calls 'Task.Delay' to wait for the specified amount of time and then does its work.

My questions

  • Does it make sense to use Durable Functions for this workflow?

  • Is this the right approach to orchestrate 'Fan-out' workflow?

  • I do not like the fact that the activity function is running for specified amount of time (3 or 5 mins) doing nothing. It will incurs costs,or?

  • Also if a delay of more than 10 minutes is required there is no way for an activity function to succeed with this approach!

  • My earlier attempt to avoid this was to use 'CreateTimer' in the orchestrator and then add the activity as a continuation, but I see only timer entries in the 'History' table. The continuation does not fire! Am I violating the constraint for orchestrator code - 'Orchestrator code must never initiate any async operation' ?

     foreach (var value in results)
     {
             //calculate time to start
             var timeToStart = ;
             var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
             tasks.Add(pnTask);
     }
    

UPDATE : using approach suggested by Chris

Activity that calculates subtasks and delays

    [FunctionName("CalculateTasks")]
    public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
    {
        //in reality time is obtained by calling an endpoint 
        DateTime currentTime = DateTime.UtcNow;
        return new List<TaskInfo> {
            new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
        };
    }

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        var currentTime = context.CurrentUtcDateTime;
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            TimeSpan timeDifference = currentTime - value.Origin;
            TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
            var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

            var timeToStart = currentTime.Add(actualDelay);

            Task delayedActivityCall = context
                 .CreateTimer(timeToStart, CancellationToken.None)
                 .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
            tasks.Add(delayedActivityCall);
        }

        await Task.WhenAll(tasks);
    }

Simply scheduling tasks from within the orchestrator seems to work.In my case I am calculating the tasks and the delays in another activity (CalculateTasks) before the loop. I want the delays to be calculated using the 'current time' when the activity was run. I am using DateTime.UtcNow in the activity. This somehow does not play well when used in the orchestrator. The activities specified by 'ContinueWith' just don't run and the orchestrator is always in 'Running' state.

Can I not use the time recorded by an activity from within the orchestrator?

UPDATE 2

So the workaround suggested by Chris works!

Since I do not want to collect the results of the activities I avoid calling 'await Tasks.WhenAll(tasks)' after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.

Am I right? Is there any way to free the orchestrator earlier i.e right after scheduling all activities?

4

4 回答 4

6

这种ContinueWith方法对我来说很好。我能够使用以下编排器代码模拟您的场景版本:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    var tasks = new List<Task>(10);
    for (int i = 0; i < 10; i++)
    {
        int j = i;
        DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
        Task delayedActivityCall = context
            .CreateTimer(timeToStart, CancellationToken.None)
            .ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}

值得一提的是,这里是活动功能代码。

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
    log.Warning($"{DateTime.Now:o}: {j:00}");
}

从日志输出中,我看到所有活动调用彼此间隔 10 秒。

另一种方法是扇出多个子编排(如@jeffhollan 建议的),这些子编排很简单,是持久计时器延迟和您的活动调用的简短序列。

更新 我尝试使用您更新的示例并且能够重现您的问题!如果您在 Visual Studio 中本地运行并将异常设置配置为始终中断异常,那么您应该看到以下内容:

System.InvalidOperationException : '检测到多线程执行。如果编排器函数代码等待不是由 DurableOrchestrationContext 方法创建的任务,则可能会发生这种情况。更多详细信息可在本文https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints中找到。

这意味着调用context.CallActivityAsync("PerformSubtask", j)的线程调用协调器函数的线程不同。我不知道为什么我最初的示例没有达到这一点,或者为什么你的版本没有。它与 TPL 如何决定使用哪个线程来运行您的ContinueWith委托有关——我需要进一步研究。

好消息是有一个简单的解决方法,即指定TaskContinuationOptions.ExecuteSynchronously,如下所示:

Task delayedActivityCall = context
    .CreateTimer(timeToStart, CancellationToken.None)
    .ContinueWith(
        t => context.CallActivityAsync("PerformSubtask", j),
        TaskContinuationOptions.ExecuteSynchronously);

请尝试一下,让我知道这是否解决了您观察到的问题。

理想情况下,您在使用Task.ContinueWith. 我在 GitHub 中打开了一个问题来跟踪它:https ://github.com/Azure/azure-functions-durable-extension/issues/317

由于我不想收集活动的结果,因此我避免await Tasks.WhenAll(tasks)在安排所有活动后调用。我这样做是为了减少对控制队列的争用,即如果需要,能够启动另一个编排。尽管如此,“协调者”的状态仍然是“正在运行”,直到所有活动完成运行。我猜它只有在最后一个活动向控制队列发布“完成”消息后才会移动到“完成”。

这是意料之中的。在所有未完成的持久任务完成之前,Orchestrator 功能永远不会真正完成。没有任何方法可以解决这个问题。请注意,您仍然可以启动其他编排器实例,如果它们碰巧落在同一个分区上(默认情况下有 4 个分区),可能会有一些争用。

于 2018-05-18T12:47:45.840 回答
3

await Task.Delay绝对不是最好的选择:你会为此付出代价,而你的函数不会做任何有用的事情。消费计划的最大延迟也被限制为 10 分钟。

在我看来,原始队列消息是即发即弃场景的最佳选择。设置适当的可见性超时,您的方案将可靠且高效地工作。

Durable Functions 的杀手级功能是awaits,它可以在保持范围的同时发挥暂停和恢复的魔力。因此,这是实现扇入的好方法,但您不需要它。

于 2018-05-17T14:47:46.300 回答
0

我认为耐用对于这个工作流程绝对有意义。我确实认为最好的选择是利用您所说的延迟/计时器功能,但基于执行的同步性质,我认为我不会将所有内容添加到真正期待的任务.WhenAll()列表.WhenAny()中目标是。我想我个人只会为每个任务做一个带有计时器延迟的顺序 foreach 循环。所以伪代码:

for(int x = 0; x < results.Length; x++) { await context.CreateTimer(TimeSpan.FromMinutes(1), ...); await context.CallActivityAsync("PerformTaskAsync", results[x]); }

无论如何,您都需要在那里等待,因此只是避免await Task.WhenAll(...)可能会在上面的代码示例中引起一些问题。希望有帮助

于 2018-05-17T14:15:54.640 回答
0

您应该能够使用IDurableOrchestrationContext.StartNewOrchestration()2019 年添加的方法来支持这种情况。有关上下文,请参阅https://github.com/Azure/azure-functions-durable-extension/issues/715

于 2021-11-30T13:21:40.773 回答