(注意:我使用的是 .Net 4,而不是.Net 4.5,所以我不能使用 TPL 的 DataflowBlock 类。)
TL;DR 版本
最终,我只是在寻找一种使用多个线程来处理顺序工作项的方法,这种方法可以在最终输出中保留它们的顺序,而不需要无界的输出缓冲区。
动机
我有现有的代码来提供一种多线程机制来处理多个数据块,其中一个 I/O 绑定线程(“供应商”)负责将数据块排队以进行处理。这些数据块构成工作项。
一个或多个线程(“处理器”)负责一次使一个工作项出队,它们处理该工作项,然后在出队下一个工作项之前将处理后的数据写入输出队列。
最终的 I/O 绑定线程(“消费者”)负责将已完成的工作项从输出队列中出列并将它们写入最终目的地。这些工作项是(并且必须)按照它们被排队的顺序编写的。我使用并发优先级队列实现了这一点,其中每个项目的优先级由其源索引定义。
我正在使用此方案对大型数据流进行一些自定义压缩,其中压缩本身相对较慢,但未压缩数据的读取和压缩数据的写入相对较快(尽管受 I/O 限制)。
我以 64K 量级的相当大的块处理数据,因此管道的开销相对较小。
我目前的解决方案运行良好,但它涉及到 6 年前使用许多同步事件编写的大量自定义代码,而且设计似乎有些笨拙;因此,我开始进行学术练习,看看是否可以使用更现代的 .Net 库对其进行重写。
新设计
我的新设计使用BlockingCollection<>
该类,并且在某种程度上基于这篇 Microsoft 文章。
特别是,请查看题为“使用多个生产者进行负载平衡”的部分。我尝试过使用这种方法,因此我有几个处理任务,每个任务都从共享输入 BlockingCollection 获取工作项,并将其完成的项目写入自己的 BlockingCollection 输出队列。
因为每个处理任务都有自己的输出队列,所以我试图用BlockingCollection.TakeFromAny()
它来使第一个可用的已完成工作项出列。
多路复用器问题
到目前为止一切顺利,但现在问题来了。微软文章指出:
差距是个问题。流水线的下一个阶段,显示图像阶段,需要按顺序显示图像,并且在序列中没有间隙。这就是多路复用器的用武之地。使用 TakeFromAny 方法,多路复用器等待来自两个过滤阶段生产者队列的输入。当图像到达时,多路复用器查看图像的序列号是否是预期序列中的下一个。如果是,多路复用器将其传递到显示图像阶段。如果图像不是序列中的下一个,则多路复用器将值保存在内部预读缓冲区中,并对没有预读值的输入队列重复执行操作。该算法允许多路复用器将来自传入生产者队列的输入放在一起,以确保按顺序排列而不对值进行排序。
好的,所以发生的事情是处理任务可以以几乎任何顺序生产成品。多路复用器负责以正确的顺序输出这些项目。
然而...
假设我们有 1000 个项目要处理。进一步想象,由于某种奇怪的原因,第一个项目需要更长的时间来处理所有其他项目的组合。
使用我当前的方案,多路复用器将继续从所有处理输出队列中读取和缓冲项目,直到找到它应该输出的下一个。由于它等待的项目(根据我上面的“想象一下”)只会在处理完所有其他工作项目后出现,我将有效地缓冲整个输入中的所有工作项目!
数据量太大,不允许这种情况发生。当输出队列达到某个最大大小(即它是有界输出队列)时,我需要能够停止处理任务输出已完成的工作项,除非工作项恰好是多路复用器正在等待的工作项。
这就是我有点卡住的地方。我可以想到很多方法来实际实现这一点,但它们似乎都过于复杂,以至于它们并不比我想要替换的代码好!
我的问题是什么?
我的问题是:我这样做是否正确?
我本以为这将是一个很好理解的问题,但我的研究只出现了似乎忽略了如果一个工作项与所有其他工作项相比需要很长时间时发生的无限缓冲问题的文章。
任何人都可以指出任何描述实现这一目标的合理方法的文章吗?
TL;DR 版本
最终,我只是在寻找一种使用多个线程来处理顺序工作项的方法,这种方法可以在最终输出中保留它们的顺序,而不需要无界的输出缓冲区。