1

我正试图解决 BlockingCollection 和我的生产者/消费者问题。

我想要实现的目标如下:

  • 一种线程安全队列,以 FIFO 方式保存对象列表(“作业”)。
  • 第二个线程安全队列,也以 FIFO 方式保存这些作业的结果列表。

换句话说:

Inbound "Job" Data, can come at any time from multiple threads 
   ==> Thread-Safe FIFO Queue 1 "FQ1"
      ==> Async Processing of data in FQ1 (and remove item from FQ1)
         ==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
            ==> Async Processing of data in FQ2 (and remove item from FQ2)
               ==> Done

到目前为止,我卑微的尝试是:

private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;

(...)

Task.Factory.StartNew(() =>
{
    foreach (InboundObject a in fq1.GetConsumingEnumerable())
       a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}

我选择 BlockingCollection 的原因之一是因为我想将负载保持在最低限度,这意味着只有当项目实际上在集合内时才工作(而不处理等待/睡眠)。我不确定 foreach 是否是正确的方法。

请让我知道这是否正确或是否有更好的方法。谢谢!

编辑 我可以从单元测试中看出,任务中的工作实际上是同步的。新版本如下:

Task.Factory.StartNew(() =>
{
    foreach (InboundObject a in fq1.GetConsumingEnumerable())
       Task.Factory.StartNew(async () => { fq2.Add(await a.DoWork()); });
}

非常感谢输入!

4

1 回答 1

1

我选择 BlockingCollection 的原因之一是因为我想将负载保持在最低限度,这意味着仅当项目实际上在集合内时才工作(而不处理等待/睡眠)。我不确定 foreach 是否是正确的方法。

这是正确的方法,foreach将被阻塞,直到将新项目添加到队列或CompleteAdding调用方法。不对的是你想用 BlockingCollection 实现异步处理。BlockingCollection 是一个简单的生产者/消费者队列,当您需要维护作业和作业结果的处理顺序时必须使用它。因为它是同步的。作业将按照添加的顺序进行处理。

如果您只需要异步执行,则不需要队列。在这种情况下,您可以使用 TPL,只需为每个作业生成一个新任务,它们将由 TPL 在内部排队,并将使用您的系统可以有效处理的尽可能多的操作系统线程。例如,您的工作可以产生自己的任务。这是更灵活的方法。

此外,生产者/消费者队列可用于组织作业的管道执行。在这种情况下,作业必须分成几个步骤。每个步骤都必须由专用线程执行。在每个作业步骤线程中,我们必须从一个队列中读取作业,执行该作业,然后将其排入下一个队列。

interface IJob
{
    void Step1();
    void Step2();
    ...
}

var step1 = new BlockingCollection<IJob>();
var step2 = new BlockingCollection<IJob>();
...

Task.Factory.StartNew(() =>
    {
        foreach(var step in step1.GetConsumingEnumerable()) {
            step.Step1();
            step2.Add(step);
        }
    });

Task.Factory.StartNew(() =>
    {
        foreach(var step in step2.GetConsumingEnumerable()) {
            // while performing Step2, another thread can execute Step1
            // of the next job
            step.Step2();
            step3.Add(step);
        }
    });

在这种情况下,作业将按 FIFO 顺序但并行执行。但是如果要做流水线处理,首先要考虑负载均衡。如果其中一个步骤花费了太多时间,它的队列会变大,而其他线程大部分时间都会处于空闲状态。

于 2012-12-17T10:35:57.590 回答