我有一个HostedService
继承自BackgroundService
. 它循环并从Queue
.
这些项目通过Controller
来自 http 的请求放置在队列中。
如果我将 Postman runner 设置为每 500 毫秒触发一个项目,最多 60 个项目。一些早期的项目在几秒钟后出列,但后来可能需要 10 秒才能出列。
我试过了Queue
:ConcurrentQueue
和BlockingCollection
。所有结果都相同。
有任何想法吗?这是我提到的任何队列类型的有效用例吗?
以下是实现细节:
登记:
services.AddSimpleInjector(_container, options =>
{
options.AddAspNetCore().AddControllerActivation();
options.AddHostedService<QueuedHostedService>();
});
后台服务:
public class QueuedHostedService : BackgroundService
{
private readonly IApiLog _apiLog;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
IApiLog apiLog)
{
TaskQueue = taskQueue;
_apiLog = apiLog;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected override async Task ExecuteAsync(
CancellationToken cancellationToken)
{
_apiLog.Log(new LogEntry("Queued Hosted Service is starting."));
while (!cancellationToken.IsCancellationRequested)
{
var workItem = await TaskQueue.Dequeue(cancellationToken);
_apiLog.Log(new LogEntry($"Dequeuing work-item: {nameof(workItem)}"));
try
{
await workItem(cancellationToken);
}
catch (Exception exception)
{
_apiLog.Log(new LogEntry(exception, $"Error occurred executing {nameof(workItem)}."));
}
}
_apiLog.Log(new LogEntry("Queued Hosted Service is stopping."));
}
}
队列:
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
private readonly IApiLog _apiLog;
private readonly Queue<Func<CancellationToken, Task>> _items = new Queue<Func<CancellationToken, Task>>();
public BackgroundTaskQueue(IApiLog apiLog)
{
_apiLog = apiLog;
}
public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem, string eventId, string correlationId)
{
try
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_items.Enqueue(workItem);
_apiLog.Log(new LogEntry($"BackgroundWorkItem has been enqueued for EventId={eventId}, CorrelationId={correlationId}"));
}
catch (Exception exception)
{
_apiLog.Log(new LogEntry(exception, exception.Message));
}
finally
{
_signal.Release();
}
}
public async Task<Func<CancellationToken, Task>> Dequeue(CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_items.TryDequeue(out var workItem);
return workItem;
}
}
在控制器中,最终将项目放入队列的方法:
public void AddRequestToQueue(MyEvent myEvent, string correlationId, string userName)
{
if (string.IsNullOrEmpty(correlationId)) correlationId = Guid.NewGuid().ToString();
_apiLog.Log(new LogEntry($"Adding Update Event Request to Queue. OperationType={OperationType.ToString()}, EventId={myEvent.Id}, CorrelationId={ correlationId }"));
BackGroundTaskQueue.QueueBackgroundWorkItem(async token =>
{
_apiLog.Log(new LogEntry($"Update Event Request Dequeued. OperationType={OperationType.ToString()}, EventId={myEvent.Id}, CorrelationId={ correlationId }"));
await AddSomethingToDatabase(myEvent, correlationId, userName);
var event = _converter.Convert<SomethingElse>(myEvent);
await SendSomethingToRabbit(event, correlationId, OperationType);
}, myEvent.Id.ToString(), correlationId);
}
我在日志行之间看到最多 10 秒:
将更新事件请求添加到队列
和
更新事件请求出队