0

我要解决的问题是(据我所知)一个典型的生产者/消费者问题。我们有 24/7/365 的数据传入。传入的数据(称为原始数据)存储在表中,最终用户无法使用。然后我们选择所有未处理的原始数据,并开始一一处理。处理完每个数据单元后,将其存储在另一个表中,现在可以由客户端应用程序使用。从加载原始数据到持久化处理数据的过程平均需要 2 - 5 秒。但它高度依赖于我们用来处理数据的第三方网络服务。如果 Web 服务速度很慢,我们处理数据的速度就不再像获取数据并积累积压数据那样快,从而导致我们的客户失去实时提要。我们想让这个过程成为一个多线程的。

  1. LOADING - 一个加载器任务(生产者),它无限期地运行并将未处理的数据从 DB 加载到BlockingCollection<T>(或并发集合的一些其他变体)。我的选择BlockingCollection是因为它在设计时考虑了生产者/消费者模式并提供了GetConsumingEnumerable()方法。

  2. PROCESSING - 使用上述数据的多个消费者BlockingCollection<T>。在其当前的实现中,我有一个Parallel.ForEach循环GetConsumingEnumerable(),每次迭代都会启动一个具有两个任务延续的任务:任务的第一步是调用第三方 Web 服务,等待结果并输出结果以供第二个任务使用。第二个任务根据第一个任务的输出进行计算并输出第三个任务的结果,第三个任务基本上只是将该结果存储到第二个任务中BlockingCollection<T>(这个是输出集合)。所以我的消费者实际上也是生产者。理想情况下,任务 1 已加载的每个数据单元都将排队等待并行处理。

  3. PERSISTING - 单个消费者针对上述第二个消费者运行BlockingCollection并将处理过的数据保存到数据库中。

我面临的问题是上面列表中的第 2 项。它似乎不够快(仅通过使用Parallel.ForEach)。我尝试在内部Parallel.ForEach而不是直接启动一个带有延续的任务,而是启动一个包装线程,该线程将依次启动处理任务。但这导致 OutOfMemory 异常,因为线程数失控并很快达到 1200。我还尝试使用 ThreadPool 安排工作,但无济于事。

您能否告知我的方法是否足以满足我们需要做的事情,或者有更好的方法吗?

4

2 回答 2

3

如果瓶颈是某个 3rd 方服务,并且这不会处理并行执行,但会将您的请求排队,那么您将无能为力。

但首先你可以试试这个:

  • 使用线程池或任务(它们也会使用线程池) - 不要自己启动线程
  • 尝试使您的请求异步而不是专门使用线程
  • 通过性能分析器运行您的服务/应用程序并检查您在哪里“浪费”您的时间
  • 对第 3 方服务进行尖峰/检查,看看它如何处理并行请求
  • 考虑缓存来自该服务的答案(如果可能)

这就是我现在能想到的,没有更多信息。

于 2012-08-30T11:36:19.077 回答
2

我最近遇到了一个与您的问题非常相似的问题,这是我所做的,希望对您有所帮助:

  1. 看起来你的第一和第三部分相当简单,可以毫无问题地在各自的线程上进行管理,
  2. 第二部分必须首先在一个新线程上启动,然后使用 System.Threading.timer 进行 Web 服务调用,调用 Web 服务的方法通过异步调用将响应(结果)传递给处理方法让它按照自己的节奏处理数据,

这解决了我的问题,我希望它也能帮助你,如果有任何疑问问我,我会在这里解释......

于 2012-08-30T12:12:35.243 回答