我必须分批执行 1000 条消息,例如操作 A、B、C。我可以并行执行这些操作。我为他们创建了群组。为了增加并行性,我在每个组中创建了子组。子组中的任务需要串行执行。但是两个子组可以并行执行。一批 1000 完成后,我必须做一些处理,即保存在 db 中。但我无法理解,如何等待所有任务完成(我对在 1000 次任务结束时等待中间不感兴趣)。欢迎任何建议。
public class OrderlyThreadPool<t> : IDisposable
{
BlockingCollection<t> _workingqueue = null;
Action<t> _handler = null;
public OrderlyThreadPool(int wrkerCount, Action<t> handler)
{
_workingqueue = new BlockingCollection<t>();
_handler = handler;
Worker worker = new Worker(wrkerCount, Process); //WorkerCount is always 1
worker.Start();
}
public void AddItem(t item)
{
_workingqueue.Add(item);
}
private void Process()
{
foreach (t item in _workingqueue.GetConsumingEnumerable())
{
_handler(item);
}
}
public void Dispose()
{
_workingqueue.CompleteAdding();
_workingqueue = null;
}
}
public class Worker
{
int _wrkerCount = 0;
Action _action = null;
public Worker(int workerCount, Action action)
{
_wrkerCount = workerCount;
_action = action;
}
public void Start()
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < _wrkerCount; i++)
{
Task.Factory.StartNew(_action);
}
}
}
所以基本上我会为每个子组创建 OrderlyThreadPool。我是来自说源的接收消息,如果没有消息可用,它会阻塞。所以我的代码,看起来像
while(true)
{
var message = GetMsg();
foreach(OrderlyThreadPool<Msg> a in myList)
{
a.AddMsg(message);
}
if(msgCount > 1000)
{
Wait for all threads to finish work;
}
else
{
msgCount =msgCount+1;
}
}