序幕
几年前遇到了一个非常相似的问题。我们需要一种可以处理大量数据的服务。有时处理需要 10 秒,有时可能需要一个小时。
起初,我们是按照您的问题说明的方式进行的:向服务发送请求,服务处理来自请求的数据并在完成后返回响应。
手头的问题
当工作只需要大约一分钟或更短的时间时,这很好,但超过此时间,服务器将关闭会话并且调用者会报告错误。
服务器在放弃请求之前有大约 2 分钟的默认时间来产生响应。它不会退出对请求的处理......但它会退出 HTTP 会话。您在 上设置的参数无关紧要HttpClient
,服务器是委托多长时间过长的服务器。
问题原因
这一切都是有充分理由的。服务器套接字非常昂贵。你有一个有限的数额。服务器试图通过切断花费超过指定时间的请求来保护您的服务,以避免套接字饥饿问题。
通常,您希望您的 HTTP 请求只需几毫秒。如果它们花费的时间比这更长,如果您的服务必须以高速率满足其他请求,您最终会遇到套接字问题。
解决方案
我们决定走的路线IHostedService
,特别是BackgroundService
。我们将此服务与队列结合使用。通过这种方式,您可以设置一个作业队列,并且BackgroundService
一次处理一个(在某些情况下,我们有服务一次处理多个队列项目,在其他情况下,我们水平扩展生成两个或更多队列)。
为什么要运行 ASP.NET Core 服务BackgroundService
?我想在不与任何特定于 Azure 的结构紧密耦合的情况下处理这个问题,以防我们需要从 Azure 迁移到其他云服务(当时我们出于其他原因考虑这样做)。
这对我们来说效果很好,从那以后我们没有看到任何问题。
工作流程是这样的:
- 调用者使用一些参数向服务发送请求
- 服务生成一个“作业”对象并立即通过 202(已接受)响应返回一个 ID
- 服务将此作业放入一个队列中,该队列由
BackgroundService
- 调用者可以使用此作业 ID 查询作业状态并获取有关已完成多少以及剩余多少的信息
- 服务完成作业,将作业置于“已完成”状态并返回等待队列以产生更多作业
请记住,您的服务能够在运行多个实例的情况下水平扩展。在这种情况下,我使用 Redis Cache 来存储作业的状态,以便所有实例共享相同的状态。
如果您没有可用的 Redis 缓存,我还添加了“内存缓存”选项以在本地进行测试。您可以在服务器上运行“内存缓存”服务,只要知道它可以扩展,那么您的数据就会不一致。
例子
由于我已婚并有孩子,周五晚上每个人都睡觉后我真的不怎么做,所以我花了一些时间整理了一个你可以尝试的例子。完整的解决方案也可供您试用。
QueuedBackgroundService.cs
此类实现有两个特定目的:一个是从队列中读取(BackgroundService
实现),另一个是写入队列(IQueuedBackgroundService
实现)。
public interface IQueuedBackgroundService
{
Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
}
public sealed class QueuedBackgroundService : BackgroundService, IQueuedBackgroundService
{
private sealed class JobQueueItem
{
public string JobId { get; set; }
public JobParametersModel JobParameters { get; set; }
}
private readonly IComputationWorkService _workService;
private readonly IComputationJobStatusService _jobStatusService;
// Shared between BackgroundService and IQueuedBackgroundService.
// The queueing mechanism could be moved out to a singleton service. I am doing
// it this way for simplicity's sake.
private static readonly ConcurrentQueue<JobQueueItem> _queue =
new ConcurrentQueue<JobQueueItem>();
private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
public QueuedBackgroundService(IComputationWorkService workService,
IComputationJobStatusService jobStatusService)
{
_workService = workService;
_jobStatusService = jobStatusService;
}
/// <summary>
/// Transient method via IQueuedBackgroundService
/// </summary>
public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
{
var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
_queue.Enqueue(new JobQueueItem { JobId = jobId, JobParameters = jobParameters });
_signal.Release(); // signal for background service to start working on the job
return new JobCreatedModel { JobId = jobId, QueuePosition = _queue.Count };
}
/// <summary>
/// Long running task via BackgroundService
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while(!stoppingToken.IsCancellationRequested)
{
JobQueueItem jobQueueItem = null;
try
{
// wait for the queue to signal there is something that needs to be done
await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);
// dequeue the item
jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;
if(jobQueueItem != null)
{
// put the job in to a "processing" state
await _jobStatusService.UpdateJobStatusAsync(
jobQueueItem.JobId, JobStatus.Processing).ConfigureAwait(false);
// the heavy lifting is done here...
var result = await _workService.DoWorkAsync(
jobQueueItem.JobId, jobQueueItem.JobParameters,
stoppingToken).ConfigureAwait(false);
// store the result of the work and set the status to "finished"
await _jobStatusService.StoreJobResultAsync(
jobQueueItem.JobId, result, JobStatus.Success).ConfigureAwait(false);
}
}
catch(TaskCanceledException)
{
break;
}
catch(Exception ex)
{
try
{
// something went wrong. Put the job in to an errored state and continue on
await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId, new JobResultModel
{
Exception = new JobExceptionModel(ex)
}, JobStatus.Errored).ConfigureAwait(false);
}
catch(Exception)
{
// TODO: log this
}
}
}
}
}
它是这样注入的:
services.AddHostedService<QueuedBackgroundService>();
services.AddTransient<IQueuedBackgroundService, QueuedBackgroundService>();
计算控制器.cs
用于读取/写入作业的控制器如下所示:
[ApiController, Route("api/[controller]")]
public class ComputationController : ControllerBase
{
private readonly IQueuedBackgroundService _queuedBackgroundService;
private readonly IComputationJobStatusService _computationJobStatusService;
public ComputationController(
IQueuedBackgroundService queuedBackgroundService,
IComputationJobStatusService computationJobStatusService)
{
_queuedBackgroundService = queuedBackgroundService;
_computationJobStatusService = computationJobStatusService;
}
[HttpPost, Route("beginComputation")]
[ProducesResponseType(StatusCodes.Status202Accepted, Type = typeof(JobCreatedModel))]
public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
{
return Accepted(
await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
}
[HttpGet, Route("computationStatus/{jobId}")]
[ProducesResponseType(StatusCodes.Status200OK, Type = typeof(JobModel))]
[ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(string))]
public async Task<IActionResult> GetComputationResultAsync(string jobId)
{
var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
if(job != null)
{
return Ok(job);
}
return NotFound($"Job with ID `{jobId}` not found");
}
[HttpGet, Route("getAllJobs")]
[ProducesResponseType(StatusCodes.Status200OK,
Type = typeof(IReadOnlyDictionary<string, JobModel>))]
public async Task<IActionResult> GetAllJobsAsync()
{
return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
}
[HttpDelete, Route("clearAllJobs")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status401Unauthorized)]
public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
{
if(permission == "this is flakey security so this can be run as a public demo")
{
await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
return Ok();
}
return Unauthorized();
}
}
工作示例
只要这个问题是活跃的,我就会维护一个你可以尝试的工作示例。对于此特定示例,您可以指定要运行的迭代次数。为了模拟长时间运行的工作,每次迭代为 1 秒。因此,如果将迭代值设置为 60,它将运行该作业 60 秒。
在它运行时,运行computationStatus/{jobId}
orgetAllJobs
端点。您可以实时观看所有工作更新。
这个例子远不是一个功能齐全的覆盖所有边缘案例的全面准备生产的例子,但它是一个好的开始。
结论
在后端工作了几年后,我看到很多问题是由于不了解后端的所有“规则”而出现的。希望这个答案能对我过去遇到的问题有所了解,并希望这可以使您不必处理上述问题。