5

我正在开发一个系统,该系统包括:

  1. 一个数据库
  2. 一堆线程将行泵入这个数据库
  3. 一堆线程在这些行上工作

关键点是不应该有超过一个工人同时在一行上操作。

当然这里有一堆策略......我可以在数据库级别锁定东西,我可以使用互斥锁等。

但无论我如何实现这一点,我都需要能够测试系统以确保我做对了。

测试这个的正确方法是什么?

我所做的只是运行数百个线程并不断检查意外重叠。问题是,它是命中还是错过。是概率。即使我运行 500 个线程一个小时,仍然可能有一个线程与另一个线程重叠,只是很少。

另外,我如何正确检查重叠?“重叠检查器”本身具有有限的分辨率,可能会丢失正在发生的实际重叠......

我知道并发是一个复杂的话题,但肯定有一些最佳实践或推荐的方法来测试这样一个系统,除了长时间运行它并交叉手指......

4

10 回答 10

4

您应该只依赖数据库,无需交叉手指:这是任何支持事务/ACID(维基百科)的真实数据库的核心功能。我错过了什么吗?'线程可以重叠'在同一行上工作是什么意思?

于 2012-09-22T11:55:48.560 回答
0

也许我不完全了解您的情况,但以下是我使用 C# 中的多个线程将数千条记录保存到数据库的内容

为了将多条记录同时保存到数据库而不会出现死锁情况,您可以使用 SQLBulkCopy (ADO.Net) 或批量插入 (SQL Server) 实用程序。

它们保持并发并且永远不会导致死锁情况。

对于日志记录,我编写了一个单例类并将其对象传递给每个线程,回到代码中,我使用 ConcurrentList 和 Lock(object) 来存储所有日志,每 5 秒后我将所有这些记录保存到数据库并清理列表,再次使用 SQLBulkCopy 命令。

如果您需要更多信息,请告诉我...

于 2012-09-18T08:33:36.157 回答
0

正如您在问题中所说,重点应该放在可测试性上,而不是性能上。

我建议一个生产者/消费者模型。您可以在数据库上写入任意数量的线程(新行),并让数据库服务器处理并发。这是系统的第一部分,许多线程将行泵入表中。

为了每行只处理一次,我建议一个线程负责加载新行并将它们泵入队列。然后,您可以拥有任意数量的线程来处理队列。当他们完成处理时,他们可以更新数据库上的行或写入输出队列,另一个线程将在其中批量收集和处理更新请求。

假设您的表中有一个 PROCESSING_STATUS 列,并且新行的 PROCESSING_STATUS = 0。因此线程可以自由地将新行添加到该表中。另一个线程将连续查询(以预定义的时间间隔/事件或简单地轮询)该表,选择 PROCESSING_STATUS = 0 的所有行。然后,将每一行添加到队列中。加载后,您可以将 PROCESSING_STATUS 更新为 1。您必须在再次查询之前完成此操作,这对于避免两次加载同一行很重要。

真正的工作线程会消耗这个队列,我假设你正在使用一个并发队列或类似的结构,能够处理许多消费者。队列算法应该保证只有一个线程可以获取相同的元素。这种 Queue 在 Python、C# 或 Java 的标准库中很容易找到。然后,真正的线程将处理这一行并将它们写回输出队列。

负责写回行的线程将更新工作线程产生的数据和 PROCESSING_STATUS 列,例如将其设置为 2。应该使用行的所有已知键和值来完成此更新,以确保它在读取后没有更改。写入线程还应检查更新查询中受影响行的值,以检查该行是否在处理后未被删除或更改。

关于可测试性,您可以检查是否有未处理的行,检查它们的 PROCESSING_STATUS 列。如果 PROCESSING_STATUS=0 - 此行未加载,如果等于 1,则已加载但未处理/写回。2 表示已处理。您仍然必须检查每一行的处理是否正确完成,但这是标准测试。

您可以检查是否有多个线程尝试访问同一行,或者该行是否在第一次读取后发生更改,检查更新语句中受影响的行。如果更新没有影响任何行,则意味着它已经被处理或更改。

因此,在这种情况下,可测试性的关键是使用队列进行线程同步并检查您对数据库的更新。您还可以在队列和处理线程上使用计数器来检查是否加载的行数 = 已处理的行数 = 写入的行数。

如果您希望多个线程从数据库加载数据,您也可以扩展 PROCESSING_STATUS 列的使用。想象一下,未处理的(新)行将被添加到 PROCESSING_STATUS = 0。然后,一组读取线程,每个线程都有一个唯一的正数且不为 0,将更新与有限的选择语句结合起来。就像是:

update TABLE_X set PROCESSING_STATUS = MY_UNIQUE_THREAD_ID
where key in (select key from TABLE_X where PROCESSING_STATUS = 0 LIMIT 5)
      and PROCESSING_STATUS = 0

如果受影响的行不为零,则该线程将加载一些行。下一步是加载 PROCESSING_STATUS = MY_UNIQUE_THREAD_ID 的所有行。然后,可以再次使用相同的算法。当处理一行时,我们用负值 MY_UNIQUE_THREAD_ID 更新它的 PROCESSING_STATUS。这样您就可以使用数据库为您处理并发,但这并不意味着您将获得最佳性能。至少,最初的问题将得到解决:每行只处理一次。

只加载一行而不给数据库服务器带来压力的另一种方法是对键(如果它是串行键)使用模运算。在 select 语句的键 (k % n_readers) 上取模。装载:

SELECT * from TABLE_X WHERE (key % N) == MY_UNIQUE_THREAD_ID
于 2012-09-20T11:34:48.727 回答
0

与其启动一堆线程以希望触发 Heisenbug,不如看看Chess之类的工具。我必须承认我从未尝试过它,但它看起来仍然适合您的问题,因为它旨在积极探索发生的所有交错。

据研究人员称,国际象棋已被集成到微软内部许多代码库的测试框架中,并且每天都被测试人员使用。

请注意,该项目最初是作为 win32 解决方案开发的,但已移植到.NET。该页面的 codeproject 链接已失效,但快速搜索显示该代码仍然可用

于 2012-09-24T19:34:44.390 回答
0

我建议您提取工作片段并通过 row_id 将它们链接起来,而不是您不必对其进行测试。

在.NET中我会做这样的事情:

private var rowWorkers = new Dictionary<int,Task>();

public void ScheduleWorkOnRow(int id) 
{
  // starting empty worker to be able to continue on it
  if(rowWorkers[id] == null) rowWorkers.Add(id, Task.Run(() => { });
  // scheduling continuation
  rowWorkers[id].ContinueWith(WorkOnRow, id);
}

private void WorkOnRow(Task task, object id)
{
  //your code
}

这个片段远非理想,但我认为你可以明白这一点。

于 2012-09-24T12:53:54.140 回答
0

我正在拆分你的问题。您似乎在这里混淆了两件事。1)如何避免多个线程在同一行上工作?和 2) 如何测试您的应用程序以确保不会发生这种情况?

我没有看到您提到的一件事是如何处理正在写入数据库的线程的顺序。当您处理用户提供的数据时,应用更改的顺序可能非常重要。只是把这个扔在那里以防万一。

如何避免:

您没有说为此使用的是哪个数据库。一些更高端的商业产品具有行锁定和许多您可能会感兴趣的其他功能。你应该和你的 DBA 谈谈,看看他们是否不能帮助你。我完全同意限制这一点的适当位置是在数据库内部。这是唯一一个你可以真正确保捕捉到 100% 的地方。

也就是说,当您绝对必须正确处理时,您应该选择多部分解决方案。然后,如果一件事失败了,其他人也许能够弥补它。因此,添加此处提到的其他一些措施。

如何测试:

创建具有已知重叠数据行的数据集,然后尝试使用 Grimace 的国际象棋工具。但是您需要您知道会导致问题的数据,以便查看代码如何处理它们并查看它是否正确处理它们。不要只是不断地向它扔随机数据,希望有什么能坚持下去。例如,如果你启动了 500 个线程,它们都试图访问同一行,可能是因为你指定了相同的主键,会发生什么?

于 2012-09-24T19:55:04.363 回答
0

就像我之前的其他人所说的那样,唯一正确的方法是对你有用的东西。没有对错,只有好的和更好的。话虽如此:

你的目标是:

关键点是不应该有超过一个工人同时在一行上操作。

因此,您有数千个线程,无论打开多少个线程,都只有一个线程可以执行更改。线程是动态创建的,很难跟踪它们,只会浪费资源和时间。

但是数据库是唯一的,因此我们可以不控制线程,而是限制数据库,并且每个特定行只允许一个线程。为此,我们需要为每列插入一个新的检查器。让我们将列命名为ThreadCheck,它可以是您想要的任何名称,对于此示例,我们将其放入文本。

1 个想法一 - 线程计数器

该列背后的想法是,每个想要对数据库中的某些行进行操作的线程都作为 Thread[Thread ID] 保存到 ThreadCheck 中。当线程完成对行的操作时,ThreadCheck 被保存为空值或特定值。它可以是任何你想要的,因为这个例子让我们把它命名为“空”。所以一个线程完成工作并且值变为空。

2 想法二 - 队列

接下来要实现的是队列。队列的逻辑基于先进先出(先入先出),可以模拟银行等候名单。在一家银行,我们有多个人试图为多个客户提供服务。但是,对于您的应用程序,这家银行只有一个人试图为许多客户(线程)提供服务。随着队列的建立,每个新线程都会获得一张等待票,另一个简单的计数器模拟线程在等待列表中的位置。

当一个线程想要对一行进行操作,并且该行是空的并且该线程有一个值为 counter+1 的等待票时,它可以对其进行操作。当计数器为 1 时,等待线程有计数器 2,下一个线程有计数器 3,直到 X。

现在要向前推进,您需要某种高级人物,例如保安人员或告诉人们“下一步!!!”的经理。在线程等待列表中,这个高级功能可以是一个周期性循环,仅当当前线程完成工作时才将行向前移动。这节省了时间和资源,使循环事件基于而不是不断检查。

您可以使用动态计数器++ 和计数器来增强总队列,并在每次线程完成时为等待票提供一个新值。或者应用一个队列限制,例如一次活动 1234567890 个线程。在队列移动之前,任何新线程都会被拒绝。

3 理念三——总结

创建一个线程并给出一个等待票。当一个线程完成时,服务计数器将该行向上移动一位。一直到最后一个线程。使用线程检查和推送队列的安全功能,存在瓶颈,而不是一大群想要同时操作的线程。

如果我试图阐述的逻辑有问题,或者您不理解某些部分,请评论我。

同样因为您尝试自己解决这个问题,所以我没有故意包含任何代码来帮助您设计自己的代码,并为自己省去尝试从头开始编写东西的麻烦。

于 2012-09-24T17:27:28.013 回答
0

测试这个的正确方法是什么?

这里没有简单的答案。要考虑的一件事是创建一个数据库代理,您可以在工作线程和可以捕获重叠的数据库层之间注入该代理。您可以在某个并发映射中记录客户端操作的行 ID,并在数据库 RPC 调用完成时将其删除。

我将使用 Java 作为伪代码。

 try {
     // add it to some atomic concurrent hash-map
     if (workingRowIdMap.putIfAbsent(rowId, null) != null) {
         // scream and shout and log a concurrency failure
     }
     // do the _real_ database stuff here
 } finally {
     workingRowIdMap.remove(rowId);
 }

如果您使用 SQL 来提取相关数据的行 ID,可能会很困难。如果您正在处理的 SQL 很复杂,这甚至可能是不可能的。如果没有有关您的架构的更多详细信息,很难知道。

此外,您将希望您的代理尽可能轻量级,否则如果添加额外的锁或过多的内存屏障,您可能会隐藏问题。


另一个想法是使用数据库工具查看二进制更改日志,以查看指令中是否有任何重叠。我知道 MySQL 有一些工具可以让你调查他们的二进制日志。然而,定制一个能够确定您的数据更新是否重叠的工具可能需要做很多工作。


另外,我如何正确检查重叠?“重叠检查器”本身具有有限的分辨率,可能会丢失正在发生的实际重叠......

同意。测试错误并不是一门精确的科学——尤其是对于高度并发的软件。我们的想法是尽力而为。我总是尝试模拟实时服务流量,这通常是复制软件在生产中看到的条件的最佳方式。重放生产日志可能能够帮助您检测问题。

除了测试之外,您可能还需要围绕软件的高并发部分进行一些小组代码审查会议。适当地隔离该代码,以便通过良好的 try / finally 块等进行简单锁定,这也是一项不错的投资。

于 2012-09-12T21:06:36.627 回答
0

我发现做这种事情的最好方法是启动大量线程,然后通过随机挂起和恢复线程来引入随机抖动。

这为您提供了许多有趣的线程调度。它是一种逐渐覆盖越来越多搜索空间的蒙特卡罗算法。

并不是说您可以随机调度线程,您需要断言不存在错误。我对您的情况最好的想法:添加一个新列WorkerCount int not null。将其初始化为零。当工作人员在其上运行时,将其递增(不要将其设置为 1 - 递增)。在那里放置一个检查约束来检查WorkerCount IN (0, 1)。当工人完成时,减少计数。

这将在第一次重叠时中断。

于 2012-09-12T21:26:15.280 回答
0

真正测试重叠;

  • 创建一个保证重叠的测试集(例如只输入一个相同的行)
  • 验证您的锁定机制是否正常工作(例如,记录每一行的工作开始和停止)

要测试性能,您应该创建一个以类似生产的方式生成数据的测试集。并使用类似的硬件和...

至于锁定行,如果所有内容都在一个应用程序中,我想我会创建一个 ConcurrentDictionary ,其中包含正在处理的行的 ID 或类似的东西。或者使用一些带有 ConcurrentQueues 的系统,其中行正在通过排队/出队处理。

于 2012-09-21T08:40:09.127 回答