我在玩赛璐珞宝石。该示例运行良好,但是当我按 Ctrl-C 时,我收到了意外消息:
^CD, [2015-10-07T09:53:19.784411 #16326] DEBUG -- : Terminating 8 actors...
几秒钟后,我得到了错误:
E, [2015-10-07T09:53:29.785162 #16326] ERROR -- : Couldn't cleanly terminate all actors in 10 seconds!
/usr/local/rvm/gems/ruby-2.0.0-p353/gems/eventmachine-1.0.7/lib/eventmachine.rb:187:in `run_machine': Interrupt
from /usr/local/rvm/gems/ruby-2.0.0-p353/gems/eventmachine-1.0.7/lib/eventmachine.rb:187:in `run'
奇怪的是我只创建了 4 个演员,而不是 8 个,而且我的 TERM,INT 信号处理程序没有被调用。
#!/usr/bin/env ruby
require './config/environment'
opts = CommandlineOptions.new.to_h
iface = opts[:iface] || '0.0.0.0'
port = opts[:port] || 3000
App.logger.info('Starting communication server')
connections = Connections.new
local_inbox = LocalQueue.new
auth_server = AuthServer.new(connections, local_inbox)
inbox_service = InboxService.new('inbox', iface, port)
inbox_service.async.process_inbox(local_inbox) # <--------
remote_outbox_name = "outbox_#{iface}:#{port}"
outbox_service = OutboxService.new(connections)
outbox_service.async.subscribe(remote_outbox_name) # <--------
conn_server_opts = { host: iface, port: port }
conn_server_opts.merge!(auth_server.callbacks)
conn_server = ConnServer.new(conn_server_opts)
%W(INT TERM).each do |signal|
trap(signal) do
info("Shutting down...")
conn_server.stop
end
end
conn_server.start
这里 InboxService 是一个创建另一个演员的演员 - 有 2 个演员,然后 OutboxService 也创建了一个演员,所以我创建了 4 个演员。
require 'redis'
require 'celluloid/current'
class InboxServiceActor
include Celluloid
def initialize(remote_inbox_name)
@remote_inbox_name = remote_inbox_name
create_redis_connection
end
def publish(full_msg)
@redis.publish(@remote_inbox_name, full_msg)
end
private
def create_redis_connection
@redis = Redis.new
end
end
require 'json'
require 'redis'
require 'celluloid/current'
class OutboxServiceActor
include Celluloid
include HasLoggerMethods
def initialize
create_redis_connection
end
def subscribe(remote_outbox_name, &block)
@redis.subscribe(remote_outbox_name) do |on|
on.message do |_channel, full_msg|
debug("Outbox message received: '#{full_msg}'")
hash = parse_msg(full_msg)
block.call(hash['signature'], hash['msg']) if message_valid?(hash)
end
end
end
private
def create_redis_connection
@redis = Redis.new
end
def parse_msg(full_msg)
JSON.parse(full_msg)
rescue JSON::ParserError
error('Outbox message JSON parse error')
nil
end
def message_valid?(msg)
msg.is_a?(Hash) && msg.key?('signature') && msg.key?('msg') ||
error('Invalid outbox message. Should '\
'contain "signature" and "msg" keys') && false
end
end