1

我正在尝试等待异步代码的条件,这是代码段:

condition = Celluloid::Condition.new   
Rails.logger.debug "Sending RPC request to #{subject}"
NATS.start uri: ENV['NATS_SERVER'] do 
  Rails.logger.debug "Connected to #{ENV['NATS_SERVER']}"
  sid = NATS.request(subject,msg) do |response|
    Rails.logger.debug "Assigning response"
    condition.signal response
    NATS.stop
  end
  NATS.timeout(sid, 1) do
    NATS.stop
    condition.signal ASYNC_ERROR
    raise "One second timeout waiting for a NATS RPC reply from #{subject}"
  end
end

result = condition.wait
if result = ASYNC_ERROR
  raise "Error in RPC call"
else
  return result
end

我得到了例外Celluloid::Condition signaled spuriously,但没有额外的信息,我真的不明白为什么会导致它,并且https://github.com/celluloid/celluloid/wiki/Conditions没有提供更多信息。

为什么会出现这种情况,我该如何解决?

4

2 回答 2

2

并使用条件完全按照您提出的问题回答您的问题。

在演员的范围内:

class Nats
  include Celluloid

  def initialize
    @condition = Celluloid::Condition.new
  end

  def start
    Rails.logger.debug "Sending RPC request to #{subject}"
    NATS.start uri: ENV['NATS_SERVER'] do 
      Rails.logger.debug "Connected to #{ENV['NATS_SERVER']}"
      sid = NATS.request(subject,msg) do |response|
        Rails.logger.debug "Assigning response"
        @condition.signal response
        NATS.stop
      end
      NATS.timeout(sid, 1) do
        NATS.stop
        @condition.signal ASYNC_ERROR
        raise "One second timeout waiting for a NATS RPC reply from #{subject}"
      end
    end
  end

  def value
    @condition.wait
  end
end

nats = Nats.new
nats.async.start

result = nats.value
if result = ASYNC_ERROR
  raise "Error in RPC call"
else
  return result
end

这甚至更少经过测试,但是如果您不打算使用Future我的其他答案,则应该向您展示基本方法。

于 2015-07-11T18:28:56.607 回答
1

你的情况不是在演员的背景下运作的。请注意,在示例中,使用了一个演员。可以避免使用参与者上下文,但这是非常不同的,并且是您收到错误的根源。

如果你不想在一个actor中实现这个,就像例子中那样,这里有一个你可以在没有actor的情况下实现的方法:

使用Celluloid::Future(即使嵌套async调用在里面)

nats = Celluloid::Future.new {
    blocker = Queue.new
    Rails.logger.debug "Sending RPC request to #{subject}"
    NATS.start uri: ENV['NATS_SERVER'] do 
        Rails.logger.debug "Connected to #{ENV['NATS_SERVER']}"
        sid = NATS.request(subject,msg) do |response|
            Rails.logger.debug "Assigning response"
            NATS.stop
            blocker << response
        end
        NATS.timeout(sid, 1) do
            NATS.stop
            blocker << nil
            #de "One second timeout waiting for a NATS RPC reply from #{subject}"
        end
    end
    blocker.pop || raise ASYNC_ERROR
}

begin
    result = nats.value
rescue ASYNC_ERROR
    raise "Error in RPC call"
rescue => ex
    #de Other exception
else
    return result
end

以上是实现async响应收集的一个松散示例,带有异常处理。这是许多可能的方法中的一个例子。

于 2015-07-11T14:04:11.167 回答