0

有几个持久函数可以相互调用。 主编排 -> 子编排 -> 活动 -> 辅助异步方法

每个 func 都具有 ILogger 依赖关系,并在函数开始和函数结束时登录。出于某种原因,两个协调器都重复“启动时”消息。(见图)活动没有这个效果。(见图)多次运行下面的例子 - 同样的故事。

我也确信整个过程已经触发了一次。

这是编排器中的错误还是预期的行为?

using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace Issues
{
    public static class Log_Issue
    {
        [FunctionName("Main")]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context,
            ILogger log)
        {
            try
            {
                log.LogWarning("Main Start");
                await context.CallSubOrchestratorAsync("Sub", null);
                log.LogWarning("Main End");
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
        }

        [FunctionName("Sub")]
        public static async Task RunSubOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context,
            ILogger log)
        {
            log.LogWarning("Sub Start");
            var data = await context.CallActivityAsync<string>("Activity", null);
            log.LogWarning("Sub End");
        }

        [FunctionName("Activity")]
        public static async Task<string> GetDataActivity([ActivityTrigger] string name, ILogger log)
        {
            log.LogWarning("Activity Start");
            var data = await GetDataAsync("https://www.google.com");
            log.LogWarning("Activity End");

            return data;
        }

        [FunctionName("Start")]
        public static async Task<IActionResult> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]
            HttpRequestMessage req,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
        {
            var instanceId = await starter.StartNewAsync("Main", null);
            log.LogWarning($"Started orchestration with ID = '{instanceId}'.");
            return new OkResult();
        }

        private static async Task<string> GetDataAsync(string url)
        {
            var httpClient = new HttpClient();

            using var request = new HttpRequestMessage
            {
                RequestUri = new Uri(url),
                Method = HttpMethod.Get,
            };

            var response = await httpClient.SendAsync(request);
            response.EnsureSuccessStatusCode();

            return await response.Content.ReadAsStringAsync();
        }
    }
}

运行结果

4

2 回答 2

3

这是意料之中的。例如在 await context.CallActivityAsync("Activity", null); 代码会自行暂停,甚至可能会从内存中加载出来(为了节省成本)。

然后编排器等待将事件放置在该活动创建的另一个 Azure 存储表中,这可能会在很多天后发生。对于活动,它们通常是非常即时的,但它仍然等待此事件发生。

发生这种情况时,代码需要从上次停止的地方开始,但没有办法做到这一点。因此代码从头开始重新运行,而不是等待活动再次完成,它首先查看表并看到我们已经完成了这个活动并且可以继续运行。如果活动函数返回某个值,它将从 await 调用中返回。在协调器的两次运行期间,它都会记录,但因为我们只进入那些只会被记录的活动。

这就是编排器必须具有确定性的原因,因为例如第一次运行时的随机值与第二次运行时的随机值不同。相反,我们会将 random.Next() 放入一个活动函数中,以便将值保存到 Azure 表存储以在后续重新运行时使用。编排器也可能正在等待正常函数创建的一些外部事件。例如,有人必须验证他们的电子邮件帐户,这可能需要几天的时间,这就是为什么持久函数可以在事件触发时自行卸载并重新启动。

于 2021-02-12T11:55:59.560 回答
1

@FilipB 所说的都是真的。它只是缺少解决它的实际代码;)

[FunctionName("Main")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger log)
{
    log = context.CreateReplaySafeLogger(log); // this is what you should use at the start of every Orchestrator

于 2021-02-12T12:34:36.713 回答