7

我有一个数据库表,其中包含一些要处理的记录。该表有一个标志列,表示以下状态值。1 - 准备好处理,2 - 成功处理,3 - 处理失败。

.net 代码(重复过程 - 控制台/服务)将获取准备好处理的记录列表,并遍历它们并尝试处理它们(不是很长),根据成功或失败更新状态。

为了获得更好的性能,我想为此进程启用多线程。我正在考虑产生 6 个线程,每个线程抓取一个子集。

显然我想避免让不同的线程处理相同的记录。我不想在数据库中有一个“正在处理”标志来处理线程崩溃导致记录挂起的情况。

我看到这样做的唯一方法是获取可用记录的完整列表并为每个线程分配一个组(可能是 id)。如果一个单独的线程失败,它的未处理记录将在下一次进程运行时被拾取。

在将组分配给线程之前,是否有其他替代方法可以划分组?

4

3 回答 3

6

实现此要求的最直接方法是使用任务并行库的

Parallel.ForEach(或Parallel.For)。

允许它管理单个工作线程。

根据经验,我建议如下:

  • 有一个附加状态“处理中”
  • 在数据库中有一个列,指示何时选择记录进行处理,以及定期运行的清理任务/进程,以查找“正在处理”时间过长的记录(将状态重置为“准备处理”)。
  • 即使您不希望它,“正在处理”对于崩溃恢复方案也是必不可少的(除非您可以容忍同一记录被处理两次)。

或者

考虑使用事务队列(想到 MSMQ 或 Rabbit MQ)。它们针对这个问题进行了优化。

那将是我明确的选择,因为两者都大规模完成。

优化

如果从数据库中检索数据需要花费大量时间,您可以考虑使用生产者/消费者模式,使用BlockingCollection实现该模式非常简单。该模式允许一个线程(生产者)使用要处理的数据库记录填充队列,并允许多个其他线程(消费者)处理该队列中的项目。

一种新的选择

鉴于在记录被认为完成之前有几个处理步骤会触及记录,请查看Windows Workflow Foundation作为可能的替代方案。

于 2012-06-06T19:16:47.423 回答
2

我记得做了类似你描述的事情......一个线程不时检查数据库中是否有需要处理的新内容。它只会加载新的 id,所以如果在 x 时最后一次读取的 id 为 1000,则在 x+1 时将从 id 1001 读取。

它读取的所有内容都进入线程安全队列。将项目添加到此队列时,您会通知工作线程(可能使用自动重置事件,或在此处生成线程)。每个线程将一次从该线程安全队列中读取一项,直到队列被清空。

您不应该在 foreach 线程工作之前分配(除非您知道 foreach 文件的进程需要相同的时间)。如果一个线程完成了工作,那么它应该从剩下的其他线程那里承担负载。使用这个线程安全队列,您可以确保这一点。

于 2012-06-06T19:20:08.777 回答
0

这是一种不依赖/使用额外的数据库列(但请参见#4)或要求进程内队列的方法。这种方法的前提是根据一些一致的值在工作人员之间“分片”记录,就像分布式缓存一样。

以下是我的假设:

  1. 再加工不会造成不必要的副作用;最多一些工作“被浪费了”。
  2. 线程数在启动时是固定的。这不是必需的,但它确实简化了实现,并允许我在下面的简单描述中跳过暂时的细节。
  3. 只有一个“工作进程”(但参见#1)控制“工作线程”。这简化了如何在工作人员之间拆分记录的处理。
  4. 有一些[不可变的]“ID”列“分布良好”。这是必需的,因此搜索工作者可以获得大约相同数量的工作。
  5. 只要“最终完成”,工作就可以“乱序”完成。此外,工作人员可能并不总是“100%”运行,因为每个工作人员都有效地在不同的队列上工作。

从 为每个线程分配一个唯一bucket[0, thread_count)。如果一个线程死亡/重新启动,它将占用与它腾出的相同的存储桶。

然后,每次线程需要一条新记录时,它都会从数据库中获取:

SELECT *
FROM record
WHERE state = 'unprocessed'
AND (id % $thread_count) = $bucket
ORDER BY date

当然,对于批量读取“此线程任务”并将它们存储在本地,可能还有其他假设。但是,本地队列将是每个线程的(因此在新线程启动时重新加载),因此它只会处理与给定bucket.

当线程完成处理记录时,应使用适当的隔离级别和/或乐观并发将记录标记为已处理,然后继续处理下一条记录。

于 2012-06-06T19:46:11.650 回答