1

我有一个使用 HTTP 触发器和队列触发器构建的 C# Azure Functions(在应用服务计划中)应用程序。该应用程序通过在客户端计算机上安装脚本来工作,该脚本使用 SQL 查询从客户端数据库中提取各种文件,将输出移动到临时 Azure Blob 存储。在每个文件完成后,将调用一个 HTTP 触发器,该触发器为队列触发器创建一个队列消息,以获取消息并将文件从临时 blob 存储移动到 blob 存储中的永久位置。在 HTTP 触发器完成并将消息放入队列后,执行返回到客户端脚本以开始处理下一个 SQL 查询。

我担心的是,当队列触发器实际上仍在工作或可能失败时,这些队列消息会堆积起来,并且客户端脚本将完成错误的成功消息,尤其是在并行处理多个客户端时。有没有办法确保在继续下一个 SQL 查询之前成功处理队列消息?

编辑:添加代码示例

我可能有 3 个客户端,他们的机器上安装了一个应用程序,每个客户端都设置为在上午 12 点执行这些脚本,并且可以同时运行,因为它们托管在客户端机器上。 客户端脚本

// perform SQL query to extract data from client database
// move extracted data to temporary Storage Blob hosted on the App Service storage account
return await httpClient.PostAsync(uri of the file in temporary blob storage)

await当文件准备好被处理时,这首先发布到 HTTP。
Azure Functions HTTP 触发器

// get storage account credentials
// write message to storage queue "job-submissions'
return new OkResult();

现在我们在“作业提交”队列中有来自多个客户端的文件。
Azure Functions 队列触发器

// pick up message from "job-submissions" queue
// use the Microsoft.Azure.Storage.Blob library to move files
// to a permanent spot in the data lake
// create meta file with info about the file 
// meta file contains info for when the extraction started and completed
// delete the temporary file
// job completed and the next queue message can be picked up  

所以问题是,当 HTTP 触发器向队列写入消息时,我无法知道队列已经完成了对文件的处理。现在这不是一个大问题,因为这个过程发生得如此之快,以至于当我在 HTTP 触发器中向队列发送消息时,队列最多只需要几秒钟来处理文件。我想知道各个作业何时完成的原因是因为我在客户端脚本中有最后一步:
客户端脚本

// after all jobs for a client have been submitted by HTTP
// get storage account credentials
// write message to a queue "client-tasks-completed" 
// queue message contains client name in the message 
// initialVisibilityDelay set to 2 minutes 
// this ensures queue has finished processing the files

然后一个单独的 Python Azure 函数在该队列上侦听以进行进一步处理:
Python QueueTrigger

# pick up message from "client-tasks-completed" queue
if 'client1' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database
elif 'client2' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database
elif 'client3' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database

Python Azure 函数在消耗计划中,batchSize设置为,1因为客户端文件有时可能很大,我不想超过 1.5GB 内存限制。所以我有两个问题,第一个是我怎么知道第一个队列触发器完成了它的工作?第二个是,如何确保 Python QueueTrigger 不会开始累积消息?我认为这两个问题都可以通过为侦听同一队列的两个队列触发器创建单独的 Azure 函数来解决。这会减轻双方的负担,但我不确定这是否是最佳做法。请在此处查看我的问题,我在此处询问有关问题 2 的更多指导:使用多个 Azure Functions QueueTriggers to listen on the same storage queue

4

1 回答 1

2

更新:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Threading;

namespace FunctionApp31
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {

            string a = "111";

            a=XX(a).Result;

            return new OkObjectResult(a);
        }

        public static async Task<string> XX(string x)
        {
            await Task.Run(()=>{
                Thread.Sleep(3000);
                x = x + "222";
                Console.WriteLine(x);
                }
                );

            return x;
        } 
    }
}

原答案:

我建议您按顺序执行处理逻辑,而不是异步执行。或者您可以等待异步操作完成后再返回,这样可以确保执行成功后再返回成功。(这样可以避免在队列仍在处理时返回结果,如您在评论中描述的那样。)

我注意到你问了一个新问题。我认为您可以扩展实例而不是创建多个函数应用程序。(当然创建多个功能应用是没有问题的)如果是基于消费计划,实例会根据负载自动伸缩。

于 2020-11-06T04:22:40.023 回答