正如您在问题中所说,重点应该放在可测试性上,而不是性能上。
我建议一个生产者/消费者模型。您可以在数据库上写入任意数量的线程(新行),并让数据库服务器处理并发。这是系统的第一部分,许多线程将行泵入表中。
为了每行只处理一次,我建议一个线程负责加载新行并将它们泵入队列。然后,您可以拥有任意数量的线程来处理队列。当他们完成处理时,他们可以更新数据库上的行或写入输出队列,另一个线程将在其中批量收集和处理更新请求。
假设您的表中有一个 PROCESSING_STATUS 列,并且新行的 PROCESSING_STATUS = 0。因此线程可以自由地将新行添加到该表中。另一个线程将连续查询(以预定义的时间间隔/事件或简单地轮询)该表,选择 PROCESSING_STATUS = 0 的所有行。然后,将每一行添加到队列中。加载后,您可以将 PROCESSING_STATUS 更新为 1。您必须在再次查询之前完成此操作,这对于避免两次加载同一行很重要。
真正的工作线程会消耗这个队列,我假设你正在使用一个并发队列或类似的结构,能够处理许多消费者。队列算法应该保证只有一个线程可以获取相同的元素。这种 Queue 在 Python、C# 或 Java 的标准库中很容易找到。然后,真正的线程将处理这一行并将它们写回输出队列。
负责写回行的线程将更新工作线程产生的数据和 PROCESSING_STATUS 列,例如将其设置为 2。应该使用行的所有已知键和值来完成此更新,以确保它在读取后没有更改。写入线程还应检查更新查询中受影响行的值,以检查该行是否在处理后未被删除或更改。
关于可测试性,您可以检查是否有未处理的行,检查它们的 PROCESSING_STATUS 列。如果 PROCESSING_STATUS=0 - 此行未加载,如果等于 1,则已加载但未处理/写回。2 表示已处理。您仍然必须检查每一行的处理是否正确完成,但这是标准测试。
您可以检查是否有多个线程尝试访问同一行,或者该行是否在第一次读取后发生更改,检查更新语句中受影响的行。如果更新没有影响任何行,则意味着它已经被处理或更改。
因此,在这种情况下,可测试性的关键是使用队列进行线程同步并检查您对数据库的更新。您还可以在队列和处理线程上使用计数器来检查是否加载的行数 = 已处理的行数 = 写入的行数。
如果您希望多个线程从数据库加载数据,您也可以扩展 PROCESSING_STATUS 列的使用。想象一下,未处理的(新)行将被添加到 PROCESSING_STATUS = 0。然后,一组读取线程,每个线程都有一个唯一的正数且不为 0,将更新与有限的选择语句结合起来。就像是:
update TABLE_X set PROCESSING_STATUS = MY_UNIQUE_THREAD_ID
where key in (select key from TABLE_X where PROCESSING_STATUS = 0 LIMIT 5)
and PROCESSING_STATUS = 0
如果受影响的行不为零,则该线程将加载一些行。下一步是加载 PROCESSING_STATUS = MY_UNIQUE_THREAD_ID 的所有行。然后,可以再次使用相同的算法。当处理一行时,我们用负值 MY_UNIQUE_THREAD_ID 更新它的 PROCESSING_STATUS。这样您就可以使用数据库为您处理并发,但这并不意味着您将获得最佳性能。至少,最初的问题将得到解决:每行只处理一次。
只加载一行而不给数据库服务器带来压力的另一种方法是对键(如果它是串行键)使用模运算。在 select 语句的键 (k % n_readers) 上取模。装载:
SELECT * from TABLE_X WHERE (key % N) == MY_UNIQUE_THREAD_ID