1

我在玩赛璐珞宝石。该示例运行良好,但是当我按 Ctrl-C 时,我收到了意外消息:

^CD, [2015-10-07T09:53:19.784411 #16326] DEBUG -- : Terminating 8 actors...

几秒钟后,我得到了错误:

E, [2015-10-07T09:53:29.785162 #16326] ERROR -- : Couldn't cleanly terminate all actors in 10 seconds!
/usr/local/rvm/gems/ruby-2.0.0-p353/gems/eventmachine-1.0.7/lib/eventmachine.rb:187:in `run_machine': Interrupt
        from /usr/local/rvm/gems/ruby-2.0.0-p353/gems/eventmachine-1.0.7/lib/eventmachine.rb:187:in `run'

奇怪的是我只创建了 4 个演员,而不是 8 个,而且我的 TERM,INT 信号处理程序没有被调用。

#!/usr/bin/env ruby

require './config/environment'

opts = CommandlineOptions.new.to_h
iface = opts[:iface] || '0.0.0.0'
port  = opts[:port] || 3000

App.logger.info('Starting communication server')

connections = Connections.new
local_inbox = LocalQueue.new
auth_server = AuthServer.new(connections, local_inbox)

inbox_service = InboxService.new('inbox', iface, port)
inbox_service.async.process_inbox(local_inbox) # <--------

remote_outbox_name = "outbox_#{iface}:#{port}"
outbox_service = OutboxService.new(connections)
outbox_service.async.subscribe(remote_outbox_name) # <--------

conn_server_opts = { host: iface, port: port }
conn_server_opts.merge!(auth_server.callbacks)
conn_server = ConnServer.new(conn_server_opts)

%W(INT TERM).each do |signal|
  trap(signal) do
    info("Shutting down...")
    conn_server.stop
  end
end

conn_server.start

这里 InboxService 是一个创建另一个演员的演员 - 有 2 个演员,然后 OutboxService 也创建了一个演员,所以我创建了 4 个演员。

require 'redis'
require 'celluloid/current'

class InboxServiceActor
  include Celluloid

  def initialize(remote_inbox_name)
    @remote_inbox_name = remote_inbox_name

    create_redis_connection
  end

  def publish(full_msg)
    @redis.publish(@remote_inbox_name, full_msg)
  end

  private

  def create_redis_connection
    @redis = Redis.new
  end
end

require 'json'
require 'redis'
require 'celluloid/current'

class OutboxServiceActor
  include Celluloid
  include HasLoggerMethods

  def initialize
    create_redis_connection
  end

  def subscribe(remote_outbox_name, &block)
    @redis.subscribe(remote_outbox_name) do |on|
      on.message do |_channel, full_msg|
        debug("Outbox message received: '#{full_msg}'")

        hash = parse_msg(full_msg)

        block.call(hash['signature'], hash['msg']) if message_valid?(hash)
      end
    end
  end

  private

  def create_redis_connection
    @redis = Redis.new
  end

  def parse_msg(full_msg)
    JSON.parse(full_msg)
  rescue JSON::ParserError
    error('Outbox message JSON parse error')
    nil
  end

  def message_valid?(msg)
    msg.is_a?(Hash) && msg.key?('signature') && msg.key?('msg') ||
      error('Invalid outbox message. Should '\
            'contain "signature" and "msg" keys') && false
  end
end
4

0 回答 0