11

假设有一个工作服务从队列接收消息,从文档数据库中读取具有指定 Id 的产品,根据消息应用一些操作逻辑,最后将更新的产品写回数据库 (a)。

水平缩放写入

当处理不同的产品时,这项工作可以安全地并行完成,因此我们可以水平扩展(b)。但是,如果多个服务实例在同一个产品上工作,我们最终可能会遇到并发问题,或者数据库中的并发异常,在这种情况下,我们应该应用一些重试逻辑(但重试仍然可能再次失败等等) .

问题:我们如何避免这种情况?有没有办法可以确保两个实例不在同一个产品上工作?

示例/用例:一家在线商店在产品 A、产品 B 和产品 C 上进行了一次大促销,在一个小时内结束,数百名客户正在购买。对于每次购买,都会将一条消息排入队列(productId、numberOfItems、price)。 目标:我们如何运行我们的工作服务的三个实例,并确保 productA 的所有消息都将在 instanceA、productB 到 instanceB 和 productC 到 instanceC 中结束(不会导致并发问题)?

注意:我的服务是用 C# 编写的,作为工作角色托管在 Azure 上,我使用 Azure 队列进行消息传递,我正在考虑使用 Mongo 进行存储。此外,实体 ID 是GUID.

它更多的是关于技术/设计,所以如果你使用不同的工具来解决我仍然感兴趣的问题。

4

5 回答 5

2

任何试图将负载分配到同一集合中的不同项目(如订单)的解决方案都注定要失败。原因是,如果您的交易流量很高,则必须开始执行以下操作之一:

  1. 让节点互相交谈 ( hey guys, are anyone working with this?)
  2. 将ID生成划分为段(节点a创建ID 1-1000,节点B 1001-1999)等,然后让他们处理自己的段
  3. 动态地将集合划分为段(并让每个节点处理一个段。

那么这些方法有什么问题呢?

第一种方法是简单地复制数据库中的事务。除非您可以花费大量时间优化策​​略,否则最好依靠交易。

后两个选项会降低性能,因为您必须根据 id 动态路由消息,并且还要在运行时更改策略以包括新插入的消息。它最终会失败。

解决方案

以下是您也可以组合使用的两种解决方案。

自动重试

相反,您在某处有一个从消息队列读取的入口点。

在里面你有这样的东西:

while (true)
{
    var message = queue.Read();
    Process(message);
}

为了获得非常简单的容错能力,您可以做的是在失败时重试:

while (true)
{
    for (i = 0; i < 3; i++)
    {
       try
       {
            var message = queue.Read();
            Process(message);
            break; //exit for loop
       }
       catch (Exception ex)
       {
           //log
           //no throw = for loop runs the next attempt
       }
    }
}

您当然可以捕获数据库异常(或者更确切地说是事务失败)来重播这些消息。

微服务

我知道,微服务是一个流行词。但在这种情况下,这是一个很好的解决方案。与其拥有一个处理所有消息的单一核心,不如将应用程序划分为更小的部分。或者在您的情况下,只需停用某些类型的消息的处理。

如果您有五个节点运行您的应用程序,您可以确保节点 A 接收与订单相关的消息,节点 B 接收与运输等相关的消息。

通过这样做,您仍然可以水平扩展您的应用程序,您不会遇到冲突并且需要很少的努力(更多的消息队列并重新配置每个节点)。

于 2015-03-09T07:16:37.250 回答
1

如果您希望始终使数据库保持最新并且始终与已处理的单元保持一致,那么您将在同一个可变实体上进行多次更新。

为了遵守这一点,您需要序列化同一实体的更新。您可以通过在生产者处对数据进行分区来做到这一点,或者将实体的事件累积在同一队列中,或者使用分布式锁或数据库级别的锁将实体锁定在工作线程中。

您可以使用一个actor模型(在使用akka的java/scala世界中)为每个实体或串行处理它们的实体组创建一个消息队列。

已更新您可以尝试使用.net和此处的akka 端口。在这里,您可以找到一个很好的教程,其中包含有关在 scala 中使用akka 的示例。但是对于一般原则,您应该更多地搜索 [actor model]。尽管如此,它也有缺点。

最后涉及对您的数据进行分区以及为特定实体创建独特的专业工作者(可以在发生故障时重用和/或重新启动)的能力。

于 2015-03-09T07:58:57.620 回答
1

对于这种事情,我使用 blob 租约。基本上,我使用某个已知存储帐户中的实体 ID 创建了一个 blob。当 worker 1 拿起实体时,它会尝试获取 blob 的租约(如果 blob 不存在,则自行创建 blob)。如果两者都成功,那么我允许对消息进行处理。之后总是释放租约。如果我不成功,我将消息转储回队列

我遵循史蒂夫马克思最初描述的方法http://blog.smarx.com/posts/managing-concurrency-in-windows-azure-with-leases 虽然调整为使用新的存储库

评论后编辑:如果您可能有很高的消息都与同一个实体交谈(正如您的推荐所暗示的那样),我会在某处重新设计您的方法..实体结构或消息结构。

例如:考虑 CQRS 设计模式并独立存储来自处理每条消息的更改。因此,产品实体现在是各种工作人员对实体所做的所有更改的集合,依次重新应用并重新水化为单个对象

于 2015-03-08T21:55:35.367 回答
0

我假设您有一种方法可以安全地访问所有工作人员服务的产品队列。鉴于此,避免冲突的一种简单方法是在主队列旁边为每个产品使用全局队列

// Queue[X] is the queue for product X
// QueueMain is the main queue 
DoWork(ProductType X)
{
  if (Queue[X].empty())
  {
    product = QueueMain().pop()
    if (product.type != X)
    {
      Queue[product.type].push(product) 
      return;
    }
  }else
  {
     product = Queue[X].pop()
  }

  //process product...
}

对队列的访问需要是原子的

于 2015-04-04T11:39:17.087 回答
-1

1)我能想到的每一个大规模数据解决方案都有内置的东西来精确处理这种冲突。详细信息将取决于您对数据存储的最终选择。在传统的关系数据库的情况下,无需您进行任何附加工作即可完成此操作。有关适当的详细信息,请参阅您选择的技术文档。

2) 了解您的数据模型和使用模式。适当地设计您的数据存储。不要为你没有的规模设计。针对您最常见的使用模式进行优化。

3)挑战你的假设。你真的需要频繁地从多个角色中改变同一个实体吗?有时答案是肯定的,但通常您可以简单地创建一个类似的新实体来反映更新。IE,采用日志/日志记录方法而不是单一实体方法。最终,单个实体上的大量更新永远不会扩展。

于 2015-03-08T20:13:20.320 回答