2

我有以下情况:

我必须执行一个函数来检索 N(N 在 0 和无限之间)记录。我必须调用映射函数将记录转换为其他内容并将它们向前移动(通过 http、服务总线、cosmos db 等)

由于 10 分钟的限制,我无法使用常规的 Azure 函数,所以我正在寻找 Durable Functions 是否可以解决我的问题。

我的想法如下:
1 - 当持久函数触发时,它会从数据库中流式传输记录。
2 - 对于每条记录,它调用映射函数。
3 - 映射后,它通过服务总线将记录发送到消息。

作为概念证明,我做了下面的例子。我模拟在持久功能中接收 1000 条消息,但它的行为非常不可靠。如果我发送 1000 条消息,函数有点崩溃或需要很长时间才能完成,我希望这段代码几乎可以立即完成。

#r "Microsoft.Azure.WebJobs.Extensions.DurableTask"

public static async Task<List<string>> Run(DurableOrchestrationContext context, TraceWriter log)
{
    var outputs = new List<string>();

    var tasks = new List<Task<string>>();
    for(int i = 0; i < 1000; i++)
    {
        log.Info(i.ToString());
        tasks.Add(context.CallActivityAsync<string>("Hello", i.ToString()));
    }

    outputs.AddRange(await Task.WhenAll(tasks.ToArray()));

    return outputs;
}

我的问题是:Durable Functions 是否适合这种情况?我应该研究一些非无服务函数方法来从数据库中提取数据吗?

有没有办法从持久函数中同步调用另一个 Azure 函数?

4

2 回答 2

4

在开始之前,您必须考虑 Durable Functions 的真正工作原理。要了解流程,请看以下示例:

#r "Microsoft.Azure.WebJobs.Extensions.DurableTask"

public static async Task Run(DurableOrchestrationContext context, TraceWriter log)
{
    await context.CallActivityAsync<string>("Hello1");
    await context.CallActivityAsync<string>("Hello2");
}

运行时的工作方式如下:

  1. 它进入编排并命中第一个,其中调用await了一个活动Hello1
  2. 控件返回到一个名为Dispatcher的组件,该组件是框架的内部部分。它检查当前编排 ID 是否已调用此特定活动。如果不是,它会等待结果并释放编排使用的资源
  3. 等待Task完成后,调度程序重新创建编排并从头开始重播
  4. 它再次等待活动Hello1,但这一次在查阅编排历史后它知道,它已被调用并保存了结果 - 它使用保存的结果并继续执行
  5. 它达到了第二个await,整个循环又一次

正如您所看到的,在幕后需要执行一些严肃的工作。在将工作委派给编排和活动时,还有一个经验法则:

  • 编排应该只编排 - 因为它有许多限制,例如单线程,只等待安全任务(这意味着DurableOrchestrationContext类型上可用的任务)并且在多个队列(而不是 VM)之间扩展。更重要的是它必须是幂等的(所以它不能使用例如DateTime.Now或直接查询数据库)
  • 活动应该执行工作 - 它作为典型功能工作(没有编排限制)并扩展到多个不同的虚拟机

在您的场景中,您应该只执行一个活动,这将完成所有工作,而不是遍历编排中的记录(特别是因为您不能在编排中使用绑定到例如服务总线 - 但是您可以这样做活动,它可以获取数据,对其进行转换,然后推送到您想要的任何类型的服务)。所以在你的代码中你可以有这样的东西:

[FunctionName("Orchestration_Client")]
public static async Task<string> Orchestration_Client(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "start")] HttpRequestMessage input,
    [OrchestrationClient] DurableOrchestrationClient starter)
{
    return await starter.StartNewAsync("Orchestration", await input.Content.ReadAsStringAsync());
}

[FunctionName("Orchestration")]
public static async Task Orchestration_Start([OrchestrationTrigger] DurableOrchestrationContext context)
{
    var payload = context.GetInput<string>();
    await context.CallActivityAsync(nameof(Activity), payload);
}

[FunctionName("Activity")]
public static string Activity(
    [ActivityTrigger] DurableActivityContext context,
    [Table(TableName, Connection = "TableStorageConnectionName")] IAsyncCollector<FooEntity> foo)
{
    // Get data from request
    var payload = context.GetInput<string>();

    // Fetch data from database
    using(var conn = new SqlConnection())
    ...

    // Transform it
    foreach(var record in databaseResult) 
    {
        // Do some work and push data
        await foo.AddAsync(new FooEntity() { // Properties });
    }

    // Result
    return $"Processed {count} records!!";
}

这更像是一个想法,而不是一个真实的例子,但你应该能够明白这一点。另一件事是,Durable Functions 是否真的是此类操作的最佳解决方案——我相信有更好的服务,例如 Azure 数据工厂。

于 2018-07-27T09:55:00.590 回答
0

另外,您可以在此处详细按照分步示例进行操作

https://sps-cloud-architect.blogspot.com/2019/12/azure-data-load-etl-process-using-azure.html

于 2019-12-12T05:21:00.813 回答