2

从多个线程获取消息到队列并让一个单独的线程一次处理该队列的项目的最佳方法是什么?

在尝试从多个线程断开活动时,我经常使用这种模式。

我为此使用了 BlockingCollection,如下面的代码摘录所示:

// start this task in a static constructor
Task.Factory.StartNew(() => ProcessMultiUseQueueEntries(), TaskCreationOptions.LongRunning);


private static BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>> _q = new BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>>();

    /// <summary>
    /// queued - Simple mechanism that will log the fact that this user is sending an xMsg (FROM a user)
    /// </summary>
    public static void LogXMsgFromUser(XClientMsgExt xMsg)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(xMsg, null, "", BOStatus.Ignore));
    }

    /// <summary>
    /// queued - Simple mechanism that will log the data being executed by this user
    /// </summary>
    public static void LogBOToUser(BOInfo boInfo)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, boInfo, "", BOStatus.Ignore));
    }

    /// <summary>
    /// queued - Simple mechanism that will log the status of the BO being executed by this user (causes the red square to flash)
    /// </summary>
    public static void LogBOStatus(string UserID, BOStatus status)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, null, UserID, status));
    }

    /// <summary>
    /// An endless thread that will keep checking the Queue for new entrants.
    /// NOTE - no error handling since this can't fail... :) lol etc 
    /// </summary>
    private static void ProcessMultiUseQueueEntries()
    {
        while (true)        //  eternal loop
        {
            Tuple<XClientMsgExt, BOInfo, string, BOStatus> tuple = _q.Take();

            // Do stuff

        }
    }

这很好用——所以我想——直到 VS2010 中的性能向导开始突出显示_q.Take()行作为我代码中的最高争用行!

注意我还使用了标准的 ConcurrentQueue 和 ManualResetEvent 组合,每次我将一个项目插入队列时,我都会发出 resetevent 信号,允许工作线程检查和处理队列,但这也具有在 . WaitOne() 方法...

是否有其他方法可以解决这种让许多线程将对象添加到并发队列中的常见模式 - 并且让一个线程在其自己的时间一次一个地遍历项目......

谢谢!!

4

3 回答 3

6

最高争夺线?是的,因为它是一个阻塞集合!该调用将阻塞(例如,它可能正在等待 a WaitHandle),直到将另一个元素添加到集合中。

你确定这是一个问题吗?这听起来正是我所期望的。

如果不清楚我的意思,请考虑以下代码:

var blocker = new BlockingCollection<int>();
int nextItem = blocker.Take();

您希望Take上面的调用运行多长时间?我希望它永远等待,因为没有任何东西被添加到blocker. 因此,如果我分析了上述代码的“性能”,我会Take在长时间运行的方法列表的顶部看到。但这并不表示存在问题;再次:您希望阻止该呼叫。


在一个完全独立的注释中,我是否建议用Tuple<XClientMsgExt, BOInfo, string, BOStatus>属性具有描述性名称的类型替换?(当然,此建议与您的问题无关;这只是一般性建议。)

于 2011-03-18T18:25:22.833 回答
2

_q.Take()就是最高争用线本身是没有意义的。如果有很多线程在等待项目,就会发生争用。最大的问题是,这种争用是否会在性能方面让您付出代价。您需要找到以下几个问题的答案:

  1. 您是否能够足够快地处理项目以防止队列无限制地增长?
  2. 就 CPU 使用率而言,争用是否让您付出了代价?
  3. 如果您停止向集合中添加内容,性能向导是否仍会报告争用?
  4. 如果停止向集合中添加内容,CPU 使用率是否很高?

如果您能够阻止队列增长并且 CPU 资源没有用于获取项目,那么就没有问题。除非您从集合中读取的线程数超过了必要的线程数。

于 2011-03-18T18:27:34.727 回答
0

您是否考虑过使用实际的MSMQ?可能看起来有点矫枉过正,但它确实提供了你需要的东西。不是说您的应用程序不能既是作家又要有专用线程来读取和处理消息。

于 2011-03-18T18:17:43.697 回答