15

在 Azure App Service 和 .NET Core 3.1 中,在不需要数据库且对该应用程序之外的任何内容都没有 IO 的应用程序中,什么是合适的解决方案?这是一个计算任务。

具体来说,以下是不可靠的,需要解决方案。

[Route("service")]
[HttpPost]
public Outbound Post(Inbound inbound)
{
    Debug.Assert(inbound.Message.Equals("Hello server."));
    Outbound outbound = new Outbound();
    long Billion = 1000000000;
    for (long i = 0; i < 33 * Billion; i++) // 230 seconds
        ;
    outbound.Message = String.Format("The server processed inbound object.");
    return outbound;
}

这有时会返回一个空对象HttpClient(未显示)。较小的工作量总是会成功。例如 30 亿次迭代总是成功的。更大的数字会很好,特别是需要 2400 亿。

我认为在 2020 年,使用 .NET Core 的 Azure 应用服务的一个合理目标可能是在 8 个子线程的帮助下将父线程数增加到 2400 亿个,因此每个子线程数达到 300 亿个,而父线程划分为 8 M 字节入站对象为每个子项的入站对象。每个孩子收到一个 1 M 字节的入站数据,并返回给父母一个 1 M 字节的出站数据。父级将结果重新组合成一个 8 M 字节的出站。

显然,经过的时间将是单线程实现所需时间的 12.5%,或 1/8,或八分之一。与计算时间相比,切割和重新组装对象的时间很小。我假设传输对象的时间与计算时间相比非常小,因此 12.5% 的预期大致准确。

如果我能得到 4 或 8 个核心,那就太好了。如果我能得到一个核心周期的 50% 的线程,那么我可能需要 8 或 16 个线程。如果每个线程给我 33% 的核心周期,那么我需要 12 或 24 个线程。

我正在考虑BackgroundService上课,但我正在寻找确认这是正确的方法。微软说...

BackgroundService is a base class for implementing a long running IHostedService.

显然,如果某些东西长时间运行,最好通过使用多个内核来更快地完成它,System.Threading但是这个文档似乎System.Threading只在通过启动任务的上下文中提到System.Threading.Timer。我的示例代码显示我的应用程序中不需要计时器。HTTP POST 将作为工作的机会。通常我会System.Threading.Thread用来实例化多个对象以使用多个内核。我发现,在需要很长时间的工作解决方案的上下文中,没有提及多核是一个明显的遗漏,但可能有某些原因 Azure App Service 不处理这个问题。也许我只是无法在教程和文档中找到它。

任务的启动是图示的 HTTP POST 控制器。假设最长的工作需要 10 分钟。HTTP 客户端(未显示)将超时限制设置为 1000 秒,这远远超过 10 分钟(600 秒),以便有安全边际。HttpClient.Timeout是相关属性。目前我假设 HTTP 超时是一个真正的限制;而不是某种非约束性(假限制),这样一些其他约束会导致用户等待 9 分钟并收到错误消息。真正的绑定限制是我可以说“但对于这个超时它会成功”的限制。如果 HTTP 超时不是真正的绑定限制并且存在其他限制系统的因素,我可以调整我的 HTTP 控制器以改为使用三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。POST2 的意思是告诉我它是否完成。POST3 意味着给我出站对象。

在 Azure App Service 和 .NET Core 3.1 中,在不需要数据库且对该应用程序之外的任何内容都没有 IO 的应用程序中,什么是合适的解决方案?这是一个计算任务。

4

5 回答 5

27

序幕

几年前遇到了一个非常相似的问题。我们需要一种可以处理大量数据的服务。有时处理需要 10 秒,有时可能需要一个小时。

起初,我们是按照您的问题说明的方式进行的:向服务发送请求,服务处理来自请求的数据并在完成后返回响应。

手头的问题

当工作只需要大约一分钟或更短的时间时,这很好,但超过此时间,服务器将关闭会话并且调用者会报告错误。

服务器在放弃请求之前有大约 2 分钟的默认时间来产生响应。它不会退出对请求的处理......但它会退出 HTTP 会话。您在 上设置的参数无关紧要HttpClient,服务器是委托多长时间过长的服务器。

问题原因

这一切都是有充分理由的。服务器套接字非常昂贵。你有一个有限的数额。服务器试图通过切断花费超过指定时间的请求来保护您的服务,以避免套接字饥饿问题。

通常,您希望您的 HTTP 请求只需几毫秒。如果它们花费的时间比这更长,如果您的服务必须以高速率满足其他请求,您最终会遇到套接字问题。

解决方案

我们决定走的路线IHostedService,特别是BackgroundService。我们将此服务与队列结合使用。通过这种方式,您可以设置一个作业队列,并且BackgroundService一次处理一个(在某些情况下,我们有服务一次处理多个队列项目,在其他情况下,我们水平扩展生成两个或更多队列)。

为什么要运行 ASP.NET Core 服务BackgroundService?我想在不与任何特定于 Azure 的结构紧密耦合的情况下处理这个问题,以防我们需要从 Azure 迁移到其他云服务(当时我们出于其他原因考虑这样做)。

这对我们来说效果很好,从那以后我们没有看到任何问题。

工作流程是这样的:

  1. 调用者使用一些参数向服务发送请求
  2. 服务生成一个“作业”对象并立即通过 202(已接受)响应返回一个 ID
  3. 服务将此作业放入一个队列中,该队列由BackgroundService
  4. 调用者可以使用此作业 ID 查询作业状态并获取有关已完成多少以及剩余多少的信息
  5. 服务完成作业,将作业置于“已完成”状态并返回等待队列以产生更多作业

请记住,您的服务能够在运行多个实例的情况下水平扩展。在这种情况下,我使用 Redis Cache 来存储作业的状态,以便所有实例共享相同的状态。

如果您没有可用的 Redis 缓存,我还添加了“内存缓存”选项以在本地进行测试。您可以在服务器上运行“内存缓存”服务,只要知道它可以扩展,那么您的数据就会不一致。

例子

由于我已婚并有孩子,周五晚上每个人都睡觉后我真的不怎么做,所以我花了一些时间整理了一个你可以尝试的例子。完整的解决方案也可供您试用。

QueuedBackgroundService.cs

此类实现有两个特定目的:一个是从队列中读取(BackgroundService实现),另一个是写入队列(IQueuedBackgroundService实现)。

    public interface IQueuedBackgroundService
    {
        Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
    }

    public sealed class QueuedBackgroundService : BackgroundService, IQueuedBackgroundService
    {
        private sealed class JobQueueItem
        {
            public string JobId { get; set; }
            public JobParametersModel JobParameters { get; set; }
        }

        private readonly IComputationWorkService _workService;
        private readonly IComputationJobStatusService _jobStatusService;

        // Shared between BackgroundService and IQueuedBackgroundService.
        // The queueing mechanism could be moved out to a singleton service. I am doing
        // it this way for simplicity's sake.
        private static readonly ConcurrentQueue<JobQueueItem> _queue =
            new ConcurrentQueue<JobQueueItem>();
        private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

        public QueuedBackgroundService(IComputationWorkService workService,
            IComputationJobStatusService jobStatusService)
        {
            _workService = workService;
            _jobStatusService = jobStatusService;
        }

        /// <summary>
        /// Transient method via IQueuedBackgroundService
        /// </summary>
        public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
        {
            var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
            _queue.Enqueue(new JobQueueItem { JobId = jobId, JobParameters = jobParameters });
            _signal.Release(); // signal for background service to start working on the job
            return new JobCreatedModel { JobId = jobId, QueuePosition = _queue.Count };
        }

        /// <summary>
        /// Long running task via BackgroundService
        /// </summary>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while(!stoppingToken.IsCancellationRequested)
            {
                JobQueueItem jobQueueItem = null;
                try
                {
                    // wait for the queue to signal there is something that needs to be done
                    await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);

                    // dequeue the item
                    jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;

                    if(jobQueueItem != null)
                    {
                        // put the job in to a "processing" state
                        await _jobStatusService.UpdateJobStatusAsync(
                            jobQueueItem.JobId, JobStatus.Processing).ConfigureAwait(false);

                        // the heavy lifting is done here...
                        var result = await _workService.DoWorkAsync(
                            jobQueueItem.JobId, jobQueueItem.JobParameters,
                            stoppingToken).ConfigureAwait(false);

                        // store the result of the work and set the status to "finished"
                        await _jobStatusService.StoreJobResultAsync(
                            jobQueueItem.JobId, result, JobStatus.Success).ConfigureAwait(false);
                    }
                }
                catch(TaskCanceledException)
                {
                    break;
                }
                catch(Exception ex)
                {
                    try
                    {
                        // something went wrong. Put the job in to an errored state and continue on
                        await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId, new JobResultModel
                        {
                            Exception = new JobExceptionModel(ex)
                        }, JobStatus.Errored).ConfigureAwait(false);
                    }
                    catch(Exception)
                    {
                        // TODO: log this
                    }
                }
            }
        }
    }

它是这样注入的:

    services.AddHostedService<QueuedBackgroundService>();
    services.AddTransient<IQueuedBackgroundService, QueuedBackgroundService>();

计算控制器.cs

用于读取/写入作业的控制器如下所示:

    [ApiController, Route("api/[controller]")]
    public class ComputationController : ControllerBase
    {
        private readonly IQueuedBackgroundService _queuedBackgroundService;
        private readonly IComputationJobStatusService _computationJobStatusService;

        public ComputationController(
            IQueuedBackgroundService queuedBackgroundService,
            IComputationJobStatusService computationJobStatusService)
        {
            _queuedBackgroundService = queuedBackgroundService;
            _computationJobStatusService = computationJobStatusService;
        }

        [HttpPost, Route("beginComputation")]
        [ProducesResponseType(StatusCodes.Status202Accepted, Type = typeof(JobCreatedModel))]
        public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
        {
            return Accepted(
                await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
        }

        [HttpGet, Route("computationStatus/{jobId}")]
        [ProducesResponseType(StatusCodes.Status200OK, Type = typeof(JobModel))]
        [ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(string))]
        public async Task<IActionResult> GetComputationResultAsync(string jobId)
        {
            var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
            if(job != null)
            {
                return Ok(job);
            }
            return NotFound($"Job with ID `{jobId}` not found");
        }

        [HttpGet, Route("getAllJobs")]
        [ProducesResponseType(StatusCodes.Status200OK,
            Type = typeof(IReadOnlyDictionary<string, JobModel>))]
        public async Task<IActionResult> GetAllJobsAsync()
        {
            return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
        }

        [HttpDelete, Route("clearAllJobs")]
        [ProducesResponseType(StatusCodes.Status200OK)]
        [ProducesResponseType(StatusCodes.Status401Unauthorized)]
        public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
        {
            if(permission == "this is flakey security so this can be run as a public demo")
            {
                await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
                return Ok();
            }
            return Unauthorized();
        }
    }

工作示例

只要这个问题是活跃的,我就会维护一个你可以尝试的工作示例。对于此特定示例,您可以指定要运行的迭代次数。为了模拟长时间运行的工作,每次迭代为 1 秒。因此,如果将迭代值设置为 60,它将运行该作业 60 秒。

在它运行时,运行computationStatus/{jobId}orgetAllJobs端点。您可以实时观看所有工作更新。

这个例子远不是一个功能齐全的覆盖所有边缘案例的全面准备生产的例子,但它是一个好的开始。

结论

在后端工作了几年后,我看到很多问题是由于不了解后端的所有“规则”而出现的。希望这个答案能对我过去遇到的问题有所了解,并希望这可以使您不必处理上述问题。

于 2020-08-15T18:00:50.183 回答
2

一种选择可能是尝试Azure Durable Functions,它更面向长时间运行的作业,这些作业保证检查点和状态,而不是尝试在触发请求的上下文中完成。它还具有扇出/扇入的概念,以防您所描述的内容可以分为具有汇总结果的较小作业。

如果目标只是原始计算,Azure Batch可能是更好的选择,因为它有助于扩展。

于 2020-08-12T04:19:33.440 回答
0

由于您说您的计算在较少的迭代中成功,一个简单的解决方案是简单地定期保存您的结果并恢复计算。

例如,假设您需要执行 2400 亿次迭代,并且您知道要可靠执行的最高迭代次数是 30 亿次迭代,我将设置以下内容:

  1. 一个从机,实际执行任务(2400 亿次迭代)
  2. 定期从从属设备接收有关进度的输入的主设备。

从站可以定期向主站发送消息(比如每 20 亿次迭代一次?)。如果计算被中断,该消息可以包含与恢复计算相关的任何内容。

  1. 主人应该跟踪奴隶。如果主设备确定从设备已经死亡/崩溃/无论如何,主设备应该简单地创建一个新的从设备,它应该从最后报告的位置恢复计算。

究竟如何实现 master 和 slave 取决于您的个人喜好。

与其让单个循环执行 2400 亿次迭代,如果您可以跨节点拆分计算,我会尝试在尽可能多的节点上同时并行计算解决方案。

我个人将 node.js 用于多核项目。尽管您使用的是 asp.net,但我还是包含了这个 node.js 示例来说明适合我的架构。

多核机器上的 Node.js

https://dzone.com/articles/multicore-programming-in-nodejs

正如 Noah Stahl 在他的回答中提到的那样,Azure Durable Functions 和 Azure Batch 似乎可以帮助您在平台上实现目标。请参阅他的答案以获取更多详细信息。

于 2020-08-15T02:49:31.780 回答
0

我认为需要完成的实际工作是迭代循环之外什么都不做,所以就可能的并行化而言,我现在无法提供太多帮助。工作是 CPU 密集型还是 IO 相关?

当涉及到 Azure 应用服务中的长期运行工作时,一种选择是使用Web Job。一种可能的解决方案是将计算请求发布到队列(存储队列Azure 消息总线队列)。然后,Web 作业处理这些消息,并可能将新消息放在请求者可以用来处理结果的另一个队列中。

如果保证处理所需的时间少于 10 分钟,则可以将 Web Job 替换为Queue Triggered Azure Function。它是 Azure 上的无服务器产品,具有很大的扩展可能性。

另一种选择确实是使用Service WorkerIHostingService的实例并在那里进行一些队列处理。

于 2020-08-12T11:10:07.927 回答
0

标准答案是使用异步消息传递。我有一个关于这个主题的博客系列。由于您已经在 Azure 中,因此尤其如此。

你已经有一个 Azure Web 应用服务,但现在你想在请求之外运行代码- “请求外部代码”。运行该代码的正确方法是在单独的进程中 - Azure Functions 或 Azure WebJobs 非常适合 Azure webapps

首先,您需要一个持久队列。Azure 存储队列非常适合,因为无论如何你都在 Azure 中。然后您的 webapi 可以将消息写入队列并返回。这里重要的部分是这是一个持久队列,而不是内存队列

同时,Azure Function / WebJob 正在处理该队列。它将从队列中提取工作并执行它。

最后一块拼图是完成通知。这是一种很常见的方法:

我可以将我的 HTTP 控制器调整为使用三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。POST2 的意思是告诉我它是否完成。POST3 意味着给我出站对象。

为此,您的后台处理器应将“进行中”/“完成/结果”状态保存在 webapi 进程可以访问的位置。如果您已经有一个共享数据库(并且保留结果是有意义的),那么这可能是最简单的选择。我还会考虑使用 Azure Cosmos DB,它有一个不错的生存时间设置,因此后台服务可以注入“24 小时内有效”或其他什么的结果,之后它们会被自动清理。

于 2021-04-06T12:54:16.093 回答