我正试图解决 BlockingCollection 和我的生产者/消费者问题。
我想要实现的目标如下:
- 一种线程安全队列,以 FIFO 方式保存对象列表(“作业”)。
- 第二个线程安全队列,也以 FIFO 方式保存这些作业的结果列表。
换句话说:
Inbound "Job" Data, can come at any time from multiple threads
==> Thread-Safe FIFO Queue 1 "FQ1"
==> Async Processing of data in FQ1 (and remove item from FQ1)
==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
==> Async Processing of data in FQ2 (and remove item from FQ2)
==> Done
到目前为止,我卑微的尝试是:
private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;
(...)
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}
我选择 BlockingCollection 的原因之一是因为我想将负载保持在最低限度,这意味着只有当项目实际上在集合内时才工作(而不处理等待/睡眠)。我不确定 foreach 是否是正确的方法。
请让我知道这是否正确或是否有更好的方法。谢谢!
编辑 我可以从单元测试中看出,任务中的工作实际上是同步的。新版本如下:
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
Task.Factory.StartNew(async () => { fq2.Add(await a.DoWork()); });
}
非常感谢输入!