我想在需要按顺序(在每个流中)处理的多个流中对相关任务进行排队。这些流可以并行处理。
具体来说,假设我需要两个队列,并且我希望每个队列中的任务按顺序处理。以下是用于说明所需行为的示例伪代码:
Queue1_WorkItem wi1a=...;
enqueue wi1a;
... time passes ...
Queue1_WorkItem wi1b=...;
enqueue wi1b; // This must be processed after processing of item wi1a is complete
... time passes ...
Queue2_WorkItem wi2a=...;
enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b
... time passes ...
Queue1_WorkItem wi1c=...;
enqueue wi1c; // This must be processed after processing of item wi1b is complete
这是一个带有箭头的图表,说明了工作项之间的依赖关系:
问题是如何使用 C# 4.0/.NET 4.0 做到这一点?现在我有两个工作线程,每个队列一个BlockingCollection<>
,每个队列使用一个。我想改为利用 .NET 线程池并让工作线程同时(跨流)处理项目,但在流中串行处理。换句话说,我希望能够指出例如 wi1b 取决于 wi1a 的完成,而不必跟踪完成并记住 wi1a,当 wi1b 到达时。换句话说,我只想说,“我想为 queue1 提交一个工作项,该工作项将与我已经为 queue1 提交的其他项串行处理,但可能与提交到其他队列的工作项并行处理”。
我希望这个描述是有道理的。如果没有,请随时在评论中提问,我会相应地更新这个问题。
谢谢阅读。
更新:
总结到目前为止“有缺陷”的解决方案,以下是我无法使用的答案部分的解决方案以及我无法使用它们的原因:
TPL 任务需要为ContinueWith()
. 我不想在提交新任务时保留每个队列的先前任务的知识。
TDF ActionBlocks 看起来很有希望,但似乎发布到 ActionBlock 的项目是并行处理的。我需要对特定队列的项目进行连续处理。
更新 2:
回复:动作块
似乎将MaxDegreeOfParallelism
选项设置为 one 会阻止并行处理提交给单个ActionBlock
. 因此,每个队列似乎ActionBlock
解决了我的问题,唯一的缺点是这需要安装和部署 Microsoft 的 TDF 库,我希望有一个纯 .NET 4.0 解决方案。到目前为止,这是候选人接受的答案,除非有人能想出一种方法来使用纯 .NET 4.0 解决方案来做到这一点,该解决方案不会退化为每个队列的工作线程(我已经在使用)。