1

我正在为 Web 服务编写一个相对简单的“代理”应用程序。总体思路是 TCP 服务器(带异步连接)将从客户端读取(字符串)数据并将该数据(作为读取回调函数的一部分)放入两个队列(Q1 和 Q2)之一。另一个线程将读取这些队列中的数据并将其传递给 Web 服务。Q1 中的数据应优先于 Q2 中可能存在的任何数据。

我一直在阅读有关生产者/消费者模式的信息,这似乎就是我试图在队列方面实现的内容。由于我的入队和出队操作将发生在不同的线程上,显然我的队列需要是线程安全的并支持某种锁定机制?这是一个 .NET 4.0 应用程序,我看到了有关新 BlockingCollection 和 ConcurrentQueue 类的文档,但我不确定到底有什么区别,或者我将如何在这种情况下实现它们。任何人都可以对此有所了解吗?谢谢!

4

2 回答 2

3

我会像下面的课程那样做。Enqueue()当您生产一个项目以将其添加到其中一个队列时,您调用它。此方法总是(几乎)立即返回。在另一个线程中,您Dequeue()在准备好使用某个项目时调用。它尝试首先从高优先级队列中获取。如果此时任何队列中都没有可用的项目,则调用阻塞。完成制作后,调用Complete(). 在进行该调用并且两个队列都为空之后,下一次调用(或当前阻塞的调用)Dequeue()throws InvalidOperationException

如果您的生产者可以在很长一段时间内比您的消费者更快,您应该绑定队列(new BlockingCollection<T>(capacity))。但在这种情况下,如果您只有一个线程同时产生低优先级和高优先级项目,则高优先级项目可能不得不等待低优先级项目。您可以通过让一个线程用于生成高优先级项目和一个用于低优先级项目的线程来解决此问题。或者您可以只绑定高优先级队列,并希望您不会一次获得一百万个低优先级项目。

class Worker<T>
{
    BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>();
    BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>();

    public void Enqueue(T item, bool highPriority)
    {
        BlockingCollection<T> queue;
        if (highPriority)
            queue = m_highPriorityQueue;
        else
            queue = m_lowPriorityQueue;

        queue.Add(item);
    }

    public T Dequeue()
    {
        T result;

        if (!m_highPriorityQueue.IsCompleted)
        {
            if (m_highPriorityQueue.TryTake(out result))
                return result;
        }

        if (!m_lowPriorityQueue.IsCompleted)
        {
            if (m_lowPriorityQueue.TryTake(out result))
                return result;
        }

        if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted)
            throw new InvalidOperationException("All work is done.");
        else
        {
            try
            {
                BlockingCollection<T>.TakeFromAny(
                    new[] { m_highPriorityQueue, m_lowPriorityQueue },
                    out result);
            }
            catch (ArgumentException ex)
            {
                throw new InvalidOperationException("All work is done.", ex);
            }

            return result;
        }
    }

    public void Complete()
    {
        m_highPriorityQueue.CompleteAdding();
        m_lowPriorityQueue.CompleteAdding();
    }
}
于 2011-03-31T21:10:23.633 回答
1

BlockingCollection 默认使用 ConcurrentQueue。应该非常适合您的应用程序。如果您将 F# 与邮箱和异步块一起使用,可能会更容易。我之前发布了一个常见实现的示例帖子。

使用 F# Agent 或 MailboxProcessor 映射/减少

于 2011-03-31T18:11:16.003 回答