我要解决的问题是(据我所知)一个典型的生产者/消费者问题。我们有 24/7/365 的数据传入。传入的数据(称为原始数据)存储在表中,最终用户无法使用。然后我们选择所有未处理的原始数据,并开始一一处理。处理完每个数据单元后,将其存储在另一个表中,现在可以由客户端应用程序使用。从加载原始数据到持久化处理数据的过程平均需要 2 - 5 秒。但它高度依赖于我们用来处理数据的第三方网络服务。如果 Web 服务速度很慢,我们处理数据的速度就不再像获取数据并积累积压数据那样快,从而导致我们的客户失去实时提要。我们想让这个过程成为一个多线程的。
LOADING - 一个加载器任务(生产者),它无限期地运行并将未处理的数据从 DB 加载到
BlockingCollection<T>
(或并发集合的一些其他变体)。我的选择BlockingCollection
是因为它在设计时考虑了生产者/消费者模式并提供了GetConsumingEnumerable()
方法。PROCESSING - 使用上述数据的多个消费者
BlockingCollection<T>
。在其当前的实现中,我有一个Parallel.ForEach
循环GetConsumingEnumerable()
,每次迭代都会启动一个具有两个任务延续的任务:任务的第一步是调用第三方 Web 服务,等待结果并输出结果以供第二个任务使用。第二个任务根据第一个任务的输出进行计算并输出第三个任务的结果,第三个任务基本上只是将该结果存储到第二个任务中BlockingCollection<T>
(这个是输出集合)。所以我的消费者实际上也是生产者。理想情况下,任务 1 已加载的每个数据单元都将排队等待并行处理。PERSISTING - 单个消费者针对上述第二个消费者运行
BlockingCollection
并将处理过的数据保存到数据库中。
我面临的问题是上面列表中的第 2 项。它似乎不够快(仅通过使用Parallel.ForEach
)。我尝试在内部Parallel.ForEach
而不是直接启动一个带有延续的任务,而是启动一个包装线程,该线程将依次启动处理任务。但这导致 OutOfMemory 异常,因为线程数失控并很快达到 1200。我还尝试使用 ThreadPool 安排工作,但无济于事。
您能否告知我的方法是否足以满足我们需要做的事情,或者有更好的方法吗?