1

我使用 SQL Server 数据库表作为工作队列。

我有一个作家和多个读者。

写入器为队列表 (INSERT) 生成新记录。

每个读取器查找处于特定状态的记录(SELECT)进行消费,获取一批记录,将每条记录标记为拥有(UPDATE),处理它们,然后将所有处理的记录标记为完成或失败(UPDATE)。记录的工作流程很简单:状态从“A”表示已添加,到“B”表示正在处理,再到“C”表示完成或“F”表示失败。并且每条记录的所有者从处于状态“A”时的未设置变为唯一标识符,用于识别剩余步骤的阅读器。唯一标识符(一个十进制)加上状态(nchar(1))加上一个额外的批次 id(int),用于标识写入器的较大批次是处理中的三个关键字段。此外,该表还有一个时间戳字段,LINQ 使用该字段进行并发检查。

我需要防止由于两个读者选择相同的记录来处理而导致锁定延迟和更新失败。作为处理工作的一部分,会进行一个 Web 服务调用,该调用可能需要不确定的时间才能完成。我们通过交易向 Web 服务供应商付款,所以我不想打电话,然后找出另一个处理相同记录的进程。此类错误将使我们损失数千美元。

我读了这篇文章:

使用 READPAST 和 UPDLOCK 处理 SQL Server 中的数据队列

这看起来很有希望,但我决定使用 LINQ to SQL。我读了这篇文章:

使用 UPDLOCK 的 Linq to SQL

后者说没有办法告诉 LINQ 使用 UPDLOCK 除非粗略地包装一个 SQL 过程。这是因为 LINQ 使用乐观并发,而这些功能假设是悲观锁定。

那么如何使用 LINQ 支持的特性来解决这个问题呢?我的应用程序是多线程的,但我可以针对同一个队列表运行多个实例,因此我不能只让一个调度程序将记录交给每个消费者线程。

我正在使用 .NET 4.0。我的应用程序是一个普通的旧 Windows 控制台应用程序。该解决方案应适用于 SQL Server 2005 和 SQL Server 2008 R2。我不想使用消息传递系统。

更新:对 SO 的进一步研究。找到这篇文章:

LINQ to SQL - 队列

答案建议有一个调度程序线程,我排除了。

更新 2:在 SO 上找到另一篇文章:

以原子方式标记并返回数据库中的一组行

这看起来真的很有希望。我需要直接执行 SQL,但是一旦我标记了我的记录并获得了我的 ID,其余的都可以在 L2S 中发生。

4

2 回答 2

0

也许您可以对读者中的出列过程采取更悲观的方法。

让读者接收下一个排队的项目。然后让读者尝试将该项目出列。阅读器可以为出队调用中的项目分配唯一值,例如 guid。

然后,阅读器可以尝试获取它已出列的排队项目。当它收到项目时,它可以比较它已分配的项目的 guid。如果它们相同,那么它就是一个出列,然后读者可以处理。

假设您的读者可以一次出列一项,这应该是可行的。我不确定在每个读者出列多个项目时这会如何工作。

于 2012-11-09T13:46:05.157 回答
0

服务这是我的答案。我将存储过程与我的 DataContext 关联以保留地址。它能够使用悲观并发在单个操作中选择、标记和返回记录。

CREATE PROCEDURE dbo.kccsp_ReserveAddresses
      @BATCH_SIZE int,
      @BATCH_ID int,
      @PROCESS_TOKEN numeric(18,0)
  AS
      -- Reserve a group of addresses in the queue so that a consumer may process them,
      -- but all other consumers will leave them alone.
      -- The query hints are essential to prevent lock contention, 
      -- or concurrency errors from multiple processors handling the same address.
      update TOP (@BATCH_SIZE) 
          KCC_GeoCodingAddressQueue WITH (ROWLOCK, READPAST, UPDLOCK)
      set 
          [Process Status] = 'B', 
          [Process Token] = @PROCESS_TOKEN
      output INSERTED.*
      where [Process Status] = 'A' and [Process Token] = 0

我通过将过程拖到我的 dbml 设计器中的结果表上来做到这一点。这会导致结果集与我的实体类相同,并在标记它们的同时返回我分配的所有记录。SQL 提示处理并发和锁争用。

当我需要在 C# 中调用它时,我会这样做:

   int? batchSize = 50;
   int? batchIdToReserve = Batch.GeoBatchID;
   decimal? processorId = someId;
   var addresses = Context.kccsp_ReserveAddresses(batchSize, batchIdToReserve, processorId);
于 2012-11-09T17:11:35.947 回答