0

我有一个HostedService继承自BackgroundService. 它循环并从Queue.

这些项目通过Controller来自 http 的请求放置在队列中。

如果我将 Postman runner 设置为每 500 毫秒触发一个项目,最多 60 个项目。一些早期的项目在几秒钟后出列,但后来可能需要 10 秒才能出列

我试过了QueueConcurrentQueueBlockingCollection。所有结果都相同。

有任何想法吗?这是我提到的任何队列类型的有效用例吗?

以下是实现细节:

登记:

    services.AddSimpleInjector(_container, options =>
    {
        options.AddAspNetCore().AddControllerActivation();
        options.AddHostedService<QueuedHostedService>();
    });

后台服务:

public class QueuedHostedService : BackgroundService
{
    private readonly IApiLog _apiLog;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        IApiLog apiLog)
    {
        TaskQueue = taskQueue;
        _apiLog = apiLog;
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected override async Task ExecuteAsync(
        CancellationToken cancellationToken)
    {
        _apiLog.Log(new LogEntry("Queued Hosted Service is starting."));


        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.Dequeue(cancellationToken);

            _apiLog.Log(new LogEntry($"Dequeuing work-item: {nameof(workItem)}"));

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception exception)
            {
                _apiLog.Log(new LogEntry(exception, $"Error occurred executing {nameof(workItem)}."));
            }

        }

        _apiLog.Log(new LogEntry("Queued Hosted Service is stopping."));
    }
}

队列:

    public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
    private readonly IApiLog _apiLog;
    private readonly Queue<Func<CancellationToken, Task>> _items = new Queue<Func<CancellationToken, Task>>();


    public BackgroundTaskQueue(IApiLog apiLog)
    {
        _apiLog = apiLog;
    }

    public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem, string eventId, string correlationId)
    {
        try
        {
            if (workItem == null)
            {
                throw new ArgumentNullException(nameof(workItem));
            }

            _items.Enqueue(workItem);
            _apiLog.Log(new LogEntry($"BackgroundWorkItem has been enqueued for EventId={eventId}, CorrelationId={correlationId}"));
        }
        catch (Exception exception)
        {
            _apiLog.Log(new LogEntry(exception, exception.Message));
        }
        finally
        {
            _signal.Release();
        }
    }

    public async Task<Func<CancellationToken, Task>> Dequeue(CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);

        _items.TryDequeue(out var workItem);

        return workItem;
    }
}

在控制器中,最终将项目放入队列的方法:

public void AddRequestToQueue(MyEvent myEvent, string correlationId, string userName)
    {
        if (string.IsNullOrEmpty(correlationId)) correlationId = Guid.NewGuid().ToString();

        _apiLog.Log(new LogEntry($"Adding Update Event Request to Queue. OperationType={OperationType.ToString()}, EventId={myEvent.Id}, CorrelationId={ correlationId }"));

        BackGroundTaskQueue.QueueBackgroundWorkItem(async token =>
        {
            _apiLog.Log(new LogEntry($"Update Event Request Dequeued. OperationType={OperationType.ToString()}, EventId={myEvent.Id}, CorrelationId={ correlationId }"));

            await AddSomethingToDatabase(myEvent, correlationId, userName);

            var event = _converter.Convert<SomethingElse>(myEvent);

            await SendSomethingToRabbit(event, correlationId, OperationType);
        }, myEvent.Id.ToString(), correlationId);

    }

我在日志行之间看到最多 10 秒:

将更新事件请求添加到队列

更新事件请求出队

4

0 回答 0