0

我遇到了一个问题,我的 ConcurrentQueue 在单例中似乎没有以正确的顺序处理项目。我知道它是 FIFO,所以我想也许内存中的队列不一样,或者我的 Dequeue 出了什么问题?我测试的方法是快速向我的 API 端点发出 3 个邮递员请求。如果有人可以帮助我理解为什么这些没有相互运行,我将不胜感激!

截至目前,我倾向于使用 Queue.TryPeek 无法正常工作,因为第二个和第三个请求似乎在第一个请求出列之前排队。

因此,当我运行以下代码时,我会在控制台中看到以下输出。

Queued message: Test 1
Sending message: Test 1
Queued message: Test 2
Sending message: Test 2
Dequeuing message: Test 2
Returning response: Test 2
Queued message: Test 3
Sending message: Test 3
Dequeuing message: Test 1
Returning response: Test 1
Dequeuing message: Test 3
Returning response: Test 3

这是我的 API 控制器方法,它正在获取一条消息并将该消息排队,一旦消息排队,它将等到它在前面看到该请求的消息,然后发送它,然后将其出列。

控制器

[HttpPost]
[Route("message")]
public IActionResult SendMessageUser([FromBody]Message request)
{
    Console.WriteLine($"Queued message: {request.Message}");
    _messageQueue.QueueAndWaitForTurn(request);
    Console.WriteLine($"Sending message: {request.Message}");
    var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
    Console.WriteLine($"Dequeuing message: {request.Message}");
    _messageQueue.DequeueMessage(request);
    Console.WriteLine($"Returning response: {request.Message}");
    return Ok(sendMessageResponse);
}

至于队列,我将它绑定到 IoC,如下所示:

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IMessageQueue, MessageQueue>();
    services.AddScoped<IMessageService, MessageService>();

    services.AddMvc();
}

这是我的队列类单例,我在这里使用单例,因为我希望在应用程序的整个生命周期中只有这个队列的 1 个实例。

public class MessageQueue : IMessageQueue
{
    private Lazy<ConcurrentQueue<Message>> _queue = 
        new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());

    public ConcurrentQueue<Message> Queue
    {
        get
        {
            return _queue.Value;
        }
    }

    public void QueueAndWaitForTurn(Message message)
    {
        Queue.Enqueue(message);

        WaitForTurn();
    }

    public bool DequeueMessage(Message message)
    {
        var messageIsDequeued = Queue.TryDequeue(out message);

        return messageIsDequeued;
    }

    public void WaitForTurn()
    {
        Message myMessage = null;
        var myMessageIsNext = Queue.TryPeek(out myMessage);

        while (!Queue.TryPeek(out myMessage))
        {
            Thread.Sleep(1000);
            WaitForTurn();
        }
    }
}
4

1 回答 1

1

我会创建一种 FifoSemaphore:

public class FifoSemaphore : IDisposable
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
    private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
    private readonly object _lockObject = new object();

    public async Task WaitAsync()
    {
        // Enqueue a task
        Task resultTask;
        lock (_lockObject)
        {
            var tcs = new TaskCompletionSource<object>();
            resultTask = tcs.Task;
            _taskQueue.Enqueue(tcs);
        }

        // Wait for the lock
        await _semaphore.WaitAsync();

        // Dequeue the next item and set it to resolved (release back to API call)
        TaskCompletionSource<object> queuedItem;
        lock (_lockObject)
        {
            queuedItem = _taskQueue.Dequeue();
        }
        queuedItem.SetResult(null);

        // Await our own task
        await resultTask;
    }

    public void Release()
    {
        // Release the semaphore so another waiting thread can enter
        _semaphore.Release();
    }

    public void Dispose()
    {
        _semaphore?.Dispose();
    }
}

然后像这样使用它:

[HttpPost]
[Route("message")]
public async Task<IActionResult> SendMessageUser([FromBody]Message request)
{
    try
    {
        await _fifoSemaphore.WaitAsync();
        // process message code here
    }
    finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
    {
        _fifoSemaphore.Release();
    }
}

这个想法是每个等待的项目将首先排队。

接下来,我们等待信号量让我们进入(我们的信号量一次允许一个项目)。

然后我们将下一个等待项出列,并将其释放回 API 方法。

最后,我们等待自己在队列中的位置完成,然后返回 API 方法。

在 API 方法中,我们异步等待轮到我们,做我们的任务,然后返回。包含一个 try/finally 以确保即使在失败的情况下也会为后续消息释放信号量。

于 2018-07-11T06:19:37.483 回答