我有一个线程正在队列中添加数据,现在我希望其他线程在添加数据时得到通知,以便它可以开始处理队列中的数据。
一种选择是线程将连续轮询队列以查看计数是否大于零,但我认为这不是好方法,任何其他建议将不胜感激
任何建议我如何实现这一点,我正在使用.net framework 3.5。
如果我有两个线程一个正在做 q.Enqueue(data)
而另一个正在做q.dequeue()
,在这种情况下我需要管理锁..?
我有一个线程正在队列中添加数据,现在我希望其他线程在添加数据时得到通知,以便它可以开始处理队列中的数据。
一种选择是线程将连续轮询队列以查看计数是否大于零,但我认为这不是好方法,任何其他建议将不胜感激
任何建议我如何实现这一点,我正在使用.net framework 3.5。
如果我有两个线程一个正在做 q.Enqueue(data)
而另一个正在做q.dequeue()
,在这种情况下我需要管理锁..?
你可以ManualResetEvent
用来通知一个线程。
ManualResetEvent e = new ManualResetEvent(false);
每次q.enqueue();
执行后e.Set()
并在处理线程中,您等待带有e.WaitOne()
.
如果你在循环内进行处理,你应该e.Reset()
在e.WaitOne()
.
使用BlockingCollection
类。这样做的好处是,Take
如果队列为空,该方法会阻塞(不进行轮询)。它包含在 .NET 4.0+ 中,或者作为Reactive Extension下载的一部分,甚至可能是通过 NuGet 的 TPL backport。如果您愿意,可以使用该类的以下未优化变体。
public class BlockingCollection<T>
{
private readonly Queue<T> m_Queue = new Queue<T>();
public void Add(T item)
{
lock (m_Queue)
{
m_Queue.Enqueue(item);
Monitor.Pulse(m_Queue);
}
}
public T Take()
{
lock (m_Queue)
{
while (m_Queue.Count == 0)
{
Monitor.Wait(m_Queue);
}
return m_Queue.Dequeue();
}
}
public bool TryTake(out T item)
{
item = default(T);
lock (m_Queue)
{
if (m_Queue.Count > 0)
{
item = m_Queue.Dequeue();
}
}
return item != null;
}
}
我不使用队列,因为我宁愿批处理它们。这在您必须打开/关闭(日志)文件、打开/关闭数据库时更有用。这是我如何创建这样的示例:
// J. van Langen
public abstract class QueueHandler<T> : IDisposable
{
// some events to trigger.
ManualResetEvent _terminating = new ManualResetEvent(false);
ManualResetEvent _terminated = new ManualResetEvent(false);
AutoResetEvent _needProcessing = new AutoResetEvent(false);
// my 'queue'
private List<T> _queue = new List<T>();
public QueueHandler()
{
new Thread(new ThreadStart(() =>
{
// what handles it should wait on.
WaitHandle[] handles = new WaitHandle[] { _terminating, _needProcessing };
// while not terminating, loop (0 timeout)
while (!_terminating.WaitOne(0))
{
// wait on the _terminating and the _needprocessing handle.
WaitHandle.WaitAny(handles);
// my temporay array to store the current items.
T[] itemsCopy;
// lock the queue
lock (_queue)
{
// create a 'copy'
itemsCopy = _queue.ToArray();
// clear the queue.
_queue.Clear();
}
if (itemsCopy.Length > 0)
HandleItems(itemsCopy);
}
// the thread is done.
_terminated.Set();
})).Start();
}
public abstract void HandleItems(T[] items);
public void Enqueue(T item)
{
// lock the queue to add the item.
lock (_queue)
_queue.Add(item);
_needProcessing.Set();
}
// batch
public void Enqueue(IEnumerable<T> items)
{
// lock the queue to add multiple items.
lock (_queue)
_queue.AddRange(items);
_needProcessing.Set();
}
public void Dispose()
{
// let the thread know it should stop.
_terminating.Set();
// wait until the thread is stopped.
_terminated.WaitOne();
}
}
对于_terminating
/_terminated
我使用 aManualResetEvent
因为那些只是设置的。
对于_needProcessing
我使用AutoResetEvent
它不能通过 ManualResetEvent 完成,因为当它被触发时,另一个线程可以Set
再次执行,所以如果你Reset
在 WaitHandle.WaitAny 之后执行它,你可以撤消新添加的项目。(嗯,如果有人可以更容易地解释这一点,欢迎您。:)
例子:
public class QueueItem
{
}
public class MyQueue : QueueHandler<QueueItem>
{
public override void HandleItems(QueueItem[] items)
{
// do your thing.
}
}
public void Test()
{
MyQueue queue = new MyQueue();
QueueItem item = new QueueItem();
queue.Enqueue(item);
QueueItem[] batch = new QueueItem[]
{
new QueueItem(),
new QueueItem()
};
queue.Enqueue(batch);
// even on dispose, all queued items will be processed in order to stop the QueueHandler.
queue.Dispose();
}
我认为 BlockingCollection 会比 Queue 做得更好。除此之外,连续检查队列大小(并在线程为零时暂停线程)是非常好的方法。
顺便说一句,我们在这里谈论生产者-消费者模式。我想你可以用谷歌搜索其他一些方法。