4

我真的不知道如何解决这个问题,但我订阅了自定义类中触发的事件,理想情况下我希望将它们排队并在它们进来时先入先出处理它们。我知道Queue<T>并且我认为我应该用这个?但是我的问题是在收到我的消息时的事件处理程序中,我是否只是Enqueue()到那里的队列,如果是这样,那么当添加新项目时如何处理队列?

我正在考虑在构造函数中调用一个方法,该方法执行类似(抓住你的帽子):

while (true)
{
  foreach (var item in myQueue)
  {
    // process
    myQueue.Dequeue();
  }
}

当然必须有一个更优雅的方式来做到这一点?这应该有效地击中 myQueue 并进行迭代,因为它包含元素并做我想做的事。表现会如何?我可以在单独的线程上生成它以避免任何线程阻塞,我真的有时间接受while (true)

4

8 回答 8

4

这是一个经典的生产者/消费者问题。快速的网络搜索显示http://msdn.microsoft.com/en-us/library/yy12yx1f(VS.80,loband).aspx,它完全涵盖了这一点。

你不想做一个 while(true) 循环,因为你的线程将消耗 100% 的 CPU,即使它没有工作,可能会饿死系统中的其他线程。

于 2009-04-16T01:13:56.207 回答
4

你不能这样做。如果在枚举器仍在使用时更改了基础集合,则为 foreach 返回的枚举器将引发异常。

本质上,您需要做的是设置另一个线程来处理事件。理想情况下,您会向这个其他线程(通过事件或其他同步机制)发出信号,表明有可用的工作。它将使工作项出列,处理它,然后休眠直到下一个信号进来。或者您可以使用轮询(定期唤醒并检查另一个项目),但这会降低效率。在任何情况下,您都需要使用锁定来确保您不会尝试同时修改两个线程中的队列。

于 2009-04-16T01:15:38.637 回答
2

如果您从多个线程运行它,则需要引入某种形式的锁定以防止队列同步出现问题。您可能还应该确保在将元素入队以及出队时锁定队列。

如果处理您的队列的线程除此之外什么都不做,那么您的基本代码可能没问题,除了处理队列当前为空的情况。您可能应该添加以下内容:

while (myQueue.Count == 0)
    Thread.Sleep(sleepTime);

这将为您提供一种“等待”的方法,直到您的事件填满您的队列。

此外,当您从队列中出列时,您将无法使用 foreach。您需要执行以下操作:

while (myQueue.Count == 0)
    Thread.Sleep(sleepTime);
while (myQueue.Count > 0)
{
    lock(myLock)
        myObject = myQueue.Dequeue();
    // do your work...
}

如果有人添加到您的队列中,这将防止集合修改问题,并使您无需锁定处理元素的整个时间。


编辑:我同意一些评论,这并不总是最有效的策略。

如果队列大部分是空的,并且偶尔只有元素,我会创建一个自定义类来包装它。

当队列为空/非空时,您可能会触发一个事件。一旦队列接收到项目,该事件就会触发一个线程来处理它并将其清空。一旦达到 0 个项目,它就会有一个事件来停止处理。

如果队列的状态大部分时间都是空的,这将比不断等待更有效。另一方面,如果队列几乎总是满的,并且处理线程很少跟上,为了简单起见,我会使用上面的方法。

于 2009-04-16T01:14:55.127 回答
1

我通常使用 ManualResetEvent 来表示元素已添加到集合中。

事件处理程序执行以下操作:

lock (myLock)
{
   myq.Enqueue(...);
   myqAddedSignal.Set();
}

处理线程等待信号 - 一旦发出信号,它就会清空队列,重置信号然后处理项目:

while (true)
{
   myqAddedSignal.WaitOne();
   lock (myLock)
   {
      // pull everything off myQ into a list
      // clear myQ
      myqAddedSignal.Reset();
   }

   foreach (obj in copyOfMyQ)
   {
      ...
   }
}

这将以线程安全的方式处理队列中的项目。唯一的共享状态是 myqAddedSignal - 对它的访问在 myLock 上同步(我通常只是将其设为对象)。

于 2009-04-16T01:35:54.220 回答
0

你在那里做的事情看起来不对劲。如果你真的在使用队列,那么你真的应该从队列中拉出项目,而不是迭代它:

while (!queue.empty()) // or whatever
{
  process the first item in the queue
}
于 2009-04-16T01:14:40.043 回答
0

您可以看到这个现有的答案,它有这样一个队列。

于 2009-04-16T03:45:22.940 回答
0

根据 Reed 给出的建议,我创建了一个自定义类并在队列为空并已填充时抛出事件。

自定义EventQueue<T>类:

public class EventQueue<T> : Queue<T>
{
    public delegate void OnQueueMadeEmptyDelegate();
    public event OnQueueMadeEmptyDelegate OnQueueMadeEmpty;
    public delegate void OnQueueMadeNonEmptyDelegate();
    public event OnQueueMadeNonEmptyDelegate OnQueueMadeNonEmpty;

    public new void Enqueue(T item)
    {
        var oldCount = Count;
        base.Enqueue(item);
        if (OnQueueMadeNonEmpty != null &&
            oldCount == 0 && Count > 0)
            // FIRE EVENT
            OnQueueMadeNonEmpty();
    }
    public new T Dequeue()
    {
        var oldCount = Count;
        var item = base.Dequeue();
        if (OnQueueMadeEmpty != null &&
            oldCount > 0 && Count == 0)
        {
            // FIRE EVENT
            OnQueueMadeEmpty();
        }
        return item;
    }
    public new void Clear()
    {
        base.Clear();
        if (OnQueueMadeEmpty != null)
        {
            // FIRE EVENT
            OnQueueMadeEmpty();
        }
    }
}

(我已删除 <summary> 以获取更小的代码示例。我使用“new”修饰符作为将附加逻辑附加到基本逻辑的一种方式)。

主类中的私人:

public delegate void InitQueueDelegate();
private InitQueueDelegate initQueueDelegate;

private EventQueue<QueueRequest> translationQueue;
private Object queueLock = new Object();

在主类构造函数中:

initQueueDelegate = this.InitQueue;
initQueueDelegate.BeginInvoke(null, null);

在主类主体中:

private void InitQueue()
{
    this.translationQueue = new EventQueue<QueueRequest>();
    this.translationQueue.OnQueueMadeEmpty += new EventQueue<QueueRequest>.OnQueueMadeEmptyDelegate(translationQueue_OnQueueMadeEmpty);
    this.translationQueue.OnQueueMadeNonEmpty += new EventQueue<QueueRequest>.OnQueueMadeNonEmptyDelegate(translationQueue_OnQueueMadeNonEmpty);
}

void translationQueue_OnQueueMadeNonEmpty()
{
    while (translationQueue.Count() > 0)
    {
        lock (queueLock)
        {
            QueueRequest request = translationQueue.Dequeue();
#if DEBUG
            System.Diagnostics.Debug.WriteLine("Item taken from queue...");
#endif
            // hard work
            ....
            ....
            ....
        }
    }
}

void translationQueue_OnQueueMadeEmpty()
{
    // empty queue
    // don't actually need to do anything here?
}

private void onMessageReceived(....)
{
  ....
  ....
  ....
  // QUEUE REQUEST
  lock (queueLock)
  {
    QueueRequest queueRequest = new QueueRequest
                                    {
                                        Request = request,
                                        Sender = sender,
                                        Recipient = tcpClientService
                                    };
    translationQueue.Enqueue(queueRequest);
#if DEBUG
    System.Diagnostics.Debug.WriteLine("Item added to queue...");
#endif
  }
}

最后是 QueueRequest 结构:

public struct QueueRequest
{
    public MessageTranslateRequest Request { get; set; }
    public TCPClientService Sender { get; set; }
    public TCPClientService Recipient { get; set; }
}

我知道那里有很多,但希望你们检查完整的实现。你怎么看?我执行锁定的方式是否正确?

如果这没问题,我将奖励 Reed,因为我的解决方案是根据他的想法创建的。

于 2009-04-16T04:08:35.663 回答
0

您正在寻找的结构是信号量。

您将项目添加到队列中,然后将计数添加到信号量。

您在另一个线程中等待信号量,并处理队列中的一个项目。

根据队列实现(如 BCL 之一),您必须在检索时锁定/解锁。

于 2009-04-16T05:28:45.547 回答