2

我有几个微服务(用 ruby​​ 实现,尽管我怀疑这对我的问题很重要)。其中一个提供项目,另一个处理它们,然后将它们标记为已处理(通过 DELETE 调用)

提供者有一个/items端点,它以 JSON 格式列出了一堆用 id 标识的项目。它还有一个DELETE /items/id端点,可以从列表中删除一项(可能是因为它已被处理)

“处理器”中的代码(非常简化)如下所示:

items = <GET provider/items>
items.each do |item|
  process item
  <DELETE provider/items/#{item.id}>
end

这有几个问题,但我想解决的一个问题是它不是线程安全的,因此我不能并行运行它。如果两个工人同时开始处理项目,他们将“踩到对方的脚趾”:他们将获得相同的项目列表,然后(尝试)处理并删除每个项目两次。

我可以更改此设置以允许并行处理的最简单方法是什么?

你可以假设我有 ruby​​ 可用。我宁愿将更改保持在最低限度,并且如果可能的话,宁愿不安装其他 gem。Sidekiq可用作消费者的排队系统。

4

3 回答 3

3

一些替代方案(只是头脑风暴):

  1. 只需删除 HTTP 并将 pub-sub 与队列一起使用。让生产者排队项目,许多消费者处理它们(并触发状态更改,在这种情况下使用 HTTP,如果你喜欢它)。
  2. 如果你真的想要 HTTP,我认为有几个缺失的部分。如果您的项目的状态是pendingprocessed,那么您的状态机中有一个隐藏/隐式状态:in_progress(或其他)。一旦你想到它,图片就会变得更清晰:你GET /items不是幂等的(因为它会将项目的状态从挂起更改为进行中),因此首先不应该是 GET。

    一个。另一种方法是添加一个新实体(例如批处理),该实体通过 POST 创建并将一些项目分组并发送它们。已经退回的项目不会成为未来批次的一部分,然后您可以将整个批次标记为已完成(例如PUT /batches/X/done)。这变得非常快,因为您将开始重新实现排队系统和普通/显式(参见 c)HTTP 中已经存在的特性(ack、超时、错误)。

    湾。一个稍微简单的替代方案:只需打开/items一个POST/ PUT(在两种情况下都很奇怪)端点,将项目标记为正在处理(并且不再返回它们,因为它只返回待处理的项目)。但同样的错误和超时问题也适用。

    C。让生产者明确并通过 PUT 向其他服务请求处理项目。您可以在正文中包含所有需要的数据,也可以将其用作 ping 并让处理器通过 GET 请求信息。您可以在任一侧添加异步处理(但在处理器中可能更好)。

我会诚实地做1(除非有令人信服的理由)。

于 2015-01-08T12:58:31.860 回答
2

在我看来,问题在于并行化这个实现是你认为每个线程都会调用:

<GET provider/items>

一种解决方案是先获取所有项目,然后进行异步处理。

我的 Ruby 不存在,但它可能看起来像这样:

class HardWorker
    include Sidekiq::Worker
    def perform(item)
        process item
        <DELETE provider/items/#{item.id}>
    end
end

items = <GET provider/items>

items.each do |item|
    HardWorker.perform_async(item)
end

这样,您的“生产者”就是循环,而消费者就是 async HardWorker

于 2015-01-08T18:04:11.697 回答
1

我可以更改此设置以允许并行处理的最简单方法是什么?

如果你可以升级服务器上的代码,或者添加中间人代码,那么最简单的方法就是队列。

如果您只喜欢客户端,没有中间人,也没有客户对客户的谈话,并且偶尔有一些冗余是可以的,那么这里有一些想法。

  1. 使用 shuffle 减少冲突

    • 如果您的服务器可以接收不存在对象的 DELETE
    • 而且“工艺项”成本+时间比较小
    • 并且该过程与顺序无关
    • 然后你可以打乱项目以减少碰撞:

      items.shuffle.each do |item|
        process item
      
  2. 使用 HEAD 检查项目是否存在

    • 如果你的服务器有 HEAD 方法
    • 并且有办法查找一个项目
    • 与“进程项”相比,HTTP 连接便宜+快速
    • 然后,如果该项目不存在,您可以跳过它:

      items.each do |item|
        next if !<HEAD provider/items/id>
      
  3. 使用轮询循环刷新项目

    • 如果这些项目类似于您轮询正在进行的工作池
    • 并且是顺序独立的
    • 并且 GET 请求是幂等的,即可以多次请求所有项目
    • 并且 DELETE 请求返回一个结果,通知您该项目不存在
    • 然后你可以处理项目直到你达到冗余,然后刷新项目列表:

      loop do
        items = <GET provider/items>
        if items.blank?
          sleep 1
          next
        end
        items.each do |item|
          process item
          <DELETE provider/items/#{item.id}>
          break if DELETE returns a code that indicates "already deleted"
        end
      end
      
  4. 以上所有内容都使用轮询循环、随机播放和 HEAD 检查相结合。

    • 鉴于没有队列、没有中间人,也没有客户对客户的谈话,这非常有效。
    • 当多个客户端检查一个项目是否存在然后开始处理它时,仍然会发生罕见的冗余“处理项目”;在实践中,这几乎是零概率,尤其是当有很多项目时。

      loop do
        items = <GET provider/items>
        if items.blank?
          sleep 1 
          next
        end
        items.shuffle do |item|
          break if !<HEAD provider/items/id>
          process item
          <DELETE provider/items/#{item.id}>
          break if DELETE returns a code that indicates "already deleted"
        end
      end
      
于 2015-01-08T17:02:06.567 回答