0

我在赛璐珞中实现了一个简单的程序,理想情况下它将并行运行几个演员,每个演员都会计算一些东西,然后将其结果发送回主要演员,其工作只是汇总结果。

这个 FAQ之后,我介绍了一个SupervisionGroup,像这样:

module Shuffling           
  class AggregatorActor
    include Celluloid      

    def initialize(shufflers)
      @shufflerset = shufflers
      @results = {}        
    end

    def add_result(result)
      @results.merge! result

      @shufflerset = @shufflerset - result.keys

      if @shufflerset.empty?
        self.output
        self.terminate
      end
    end

    def output
      puts @results
    end
  end

  class EvalActor
    include Celluloid

    def initialize(shufflerClass)
      @shuffler = shufflerClass.new
      self.async.runEvaluation
    end

    def runEvaluation
      # computation here, which yields result
      Celluloid::Actor[:aggregator].async.add_result(result)
      self.terminate
    end
  end

  class ShufflerSupervisionGroup < Celluloid::SupervisionGroup
    shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman].to_set

    supervise AggregatorActor, as: :aggregator, args: [shufflers.map { |sh| sh.new.name }]

    shufflers.each do |shuffler|
      supervise EvalActor, as: shuffler.name.to_sym, args: [shuffler]
    end
  end

  ShufflerSupervisionGroup.run
end

EvalActor在他们完成后终止 s,AggregatorActor当所有工作人员都完成后我也终止。

但是,监督线程保持活动状态并使主线程保持活动状态。程序永远不会终止。

如果我发送.run!到该组,那么主线程会在它之后立即终止,并且没有任何作用。

终止后,我可以做些什么来终止组(或者,用组术语,finalize我想)AggregatorActor

4

1 回答 1

0

毕竟,我所做的是将其更改AggregatorActorwait_for_results

class AggregatorActor
  include Celluloid

  def initialize(shufflers)
    @shufflerset = shufflers
    @results = {}
  end

  def wait_for_results
    sleep 5 while not @shufflerset.empty?

    self.output
    self.terminate
  end

  def add_result(result)
    @results.merge! result

    @shufflerset = @shufflerset - result.keys

    puts "Results for #{result.keys.inspect} recorded, remaining: #{@shufflerset.inspect}"
  end

  def output
    puts @results
  end
end

然后我摆脱了SupervisionGroup(因为我不需要监督,即重新运行失败的演员),我像这样使用它:

shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman, RiffleShuffle].to_set

Celluloid::Actor[:aggregator] = AggregatorActor.new(shufflers.map { |sh| sh.new.name })

shufflers.each do |shuffler|
  Celluloid::Actor[shuffler.name.to_sym] = EvalActor.new shuffler
end

Celluloid::Actor[:aggregator].wait_for_results

感觉不是很干净,如果有更清洁的方法会很好,但至少这是可行的。

于 2014-05-07T10:13:12.157 回答