7

我正在使用以下工作流程进行项目:

第一部分:

  • 事件异步到达并在阻塞队列中排队,我们称之为Q1
  • 线程从该队列中获取下一个可用项目
  • 项目最终并行运行 {N} 个任务
  • 每个任务将其结果排入第二个队列,我们​​称之为Q2
  • 当项目处理完成时,从队列中读取下一个项目。

第二部分:

  • 另一个线程一次读取Q2一个对象并处理结果

所以,这里的问题是,第一个队列中的每个项目最终都会并行运行大量任务,每个任务都将其结果排队。第二个队列必须按顺序处理,一次一个项目,但它被淹没了。


我的问题

我需要一种机制,使线程处理Q1等到Q2中的项目数低于特定阈值。实现这一目标的最佳方法是什么?有没有办法拥有事件驱动的解决方案而不是轮询解决方案?

4

3 回答 3

9

Queue<T>您可以将 aBlockingCollection<T>用于 Q2 ,而不是使用 a 。如果您设置它BoundedCapacity,调用Q2.Add()将在达到容量时阻塞。这将自动限制 Q1 的处理,因为如果 N 个任务无法添加到最终队列中,它们将开始阻塞。

于 2013-03-06T17:04:12.073 回答
2

我假设您在偶尔的洪水中收到数据,在长时间干旱期间,第二季度可以赶上。您是否考虑过通过为这些任务使用有限的线程池来简单地限制从 Q1 产生的并发线程的数量?

如果在到达时很容易确定作业大小,我怀疑您可以从多个线程池中受益。您可以有少量线程来处理大型作业,而大量线程准备好处理小型作业。即使是第三个中间队列也可能是有益的。

于 2013-03-06T17:05:07.043 回答
1

您的问题似乎是TPL Dataflow图书馆解决的完美示例。如果你愿意尝试,这里是它的工作原理(当然这是一个非常简单的例子):

TransformBlock<int, bool> transform = new TransformBlock<int, bool>(i => i > 5 ? true : false,
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
ActionBlock<bool> resultBlock = new ActionBlock<bool>(b => Console.WriteLine("My result is : " + b),
            new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
transform.LinkTo(resultBlock);

您正在定义一个转换块,它将使您的转换(这作为您的Q1),您可以将它的并行度级别设置为您想要使用的任务数。

然后,您正在创建第二个块(作为您的Q2),它将BoundedCapacity设置并同步处理每条消息,为每个元素调用一个操作。该块可以被任何其他块替换,例如BufferBlock允许您根据需要从中进行轮询。

于 2013-03-06T17:09:29.223 回答