如果您不想增加池的大小并假设您SocketAsyncEventArgs
在使用后正确返回每个池,则可以使用BlockingCollection来保存所需数量的SocketAsyncEventArgs
. 当没有更多的项目可以消费时,消费者将阻塞,直到一个项目返回到集合中。
更新
这是一些创建BlockingCollection
大小为 1 的示例代码并触发一些消费者同时处理。每个消费者从集合中取出一个项目来处理它,同时其他消费者被阻止,Take
直到一个项目被添加回集合中。
处理时,您可能需要在一个try/finally
块中执行此操作,以确保在处理项目后,如果抛出异常,则始终将其添加回来。
要关闭您调用的集合CompleteAdding()
,任何阻塞Take
的方法都会抛出一个InvalidOperationException
public void RunConsumer(BlockingCollection<SocketAsyncEventArgs> collection, int consumerId)
{
Task.Run( async () =>
{
Console.WriteLine("Consumer {0} waiting", consumerId);
SocketAsyncEventArgs args = null;
try
{
args = collection.Take();
Console.WriteLine("Consumer {0} processing", consumerId);
await Task.Delay(5000);
}
catch(ObjectDisposedException)
{
Console.WriteLine("Consumer {0} collection has been disposed", consumerId);
}
catch(InvalidOperationException)
{
Console.WriteLine("Consumer {0} collection has been closed", consumerId);
}
finally
{
// add the item back if collection hasn't been closed.
if(args != null && !collection.IsAddingCompleted)
collection.Add(args);
}
Console.WriteLine("Consumer {0} finished", consumerId);
});
}
用法
void Main()
{
var collection = new BlockingCollection<SocketAsyncEventArgs>(1) { new SocketAsyncEventArgs() };
RunConsumer(collection, 1);
RunConsumer(collection, 2);
RunConsumer(collection, 3);
Thread.Sleep(9000);
collection.CompleteAdding();
Console.ReadLine();
}
输出
Consumer 1 waiting
Consumer 3 waiting
Consumer 2 waiting
Consumer 1 processing
Consumer 1 finished
Consumer 3 processing
Consumer 2 collection has been closed
Consumer 2 finished
Consumer 3 finished