0

我需要在进程中处理队列中的作业,并异步执行 IO。这很简单。问题是这些作业可以将其他项目添加到队列中。

我想我一直在摆弄这个问题太久了,所以我的大脑一片混乱——这应该不会太难。我不断想出一个非此即彼的情况:

  1. 队列可以异步执行作业,然后可以加入结果。
  2. 队列可以同步执行作业,直到最后一个完成并且队列为空。

我一直在摆弄从 EventMachine 和 Goliath(两者都可以使用EM::HttpRequest)到赛璐珞(实际上从来没有用它来构建东西),以及使用 Fibers 编写枚举器的所有东西。不过,我的大脑已经炸了。

简单地说,我想要的是能够做到这一点:

items = [1,2,3]
items.each do |item|
  if item.has_particular_condition? 
    items << item.process_one_way
  elsif item.other_condition?
    items << item.process_another_way
  # ...
  end
end

#=> [1,2,3,4,5,6]

...其中4、5和6都是处理集合中原始项目的结果,而7、8和9是处理4、5和6的结果。我不需要担心无限期处理队列,因为我正在处理的数据将在几次迭代后结束。

欢迎高层次的指导、评论、其他库的链接等,以及低层次的实现代码示例。

4

2 回答 2

0

我过去也有类似的要求,而您需要的是一个可靠的、高性能的工作队列。我建议您查看一年多前我发现的beanstalkd,它一直在用 ruby​​ 可靠地处理成千上万个工作。

特别是,我已经开始围绕 beanstalkd 开发可靠的 ruby​​ 库。特别是,一定要检查backburner,它是使用 beanstalkd 的 ruby​​ 中的生产就绪工作队列。语法和设置很简单,定义作业如何快速处理,处理作业失败和重试都是内置的,作业调度等等。

如果您有任何问题,请告诉我,但我认为 beanstalkd 和 backburner 非常适合您的要求。

于 2012-11-15T11:36:35.977 回答
0

我最终实现了一些不太理想的东西——基本上只是将一个 EM Fiber Iterator 包装在一个循环中,一旦没有新的结果排队就会终止。

require 'set'

class SetRunner
  def initialize(seed_queue)
    @results = seed_queue.to_set
  end

  def run
    begin
      yield last_loop_results, result_bucket
    end until new_loop_results.empty?

    return @results
  end

  def last_loop_results
    result_bucket.shift(result_bucket.count)
  end

  def result_bucket
    @result_bucket ||= @results.to_a
  end

  def new_loop_results
    # .add? returns nil if already in the set
    result_bucket.each { |item| @results.add? item }.compact
  end
end

然后,将它与 EventMachine 一起使用:

queue = [1,2,3]
results = SetRunner.new(queue).run do |set, output|
  EM::Synchrony::FiberIterator.new(set, 3).each do |item|
    output.push(item + 3) if item <= 6
  end
end
# => [1,2,3,4,5,6,7,8,9]

然后每个集合将使用传递给 FiberIterator 的并发级别运行,但每个集合的结果将在外部 SetRunner 循环的下一次迭代中运行。

于 2012-12-03T18:34:07.477 回答