我正在使用 eventmachine 从 HornetQ 主题中读取,推送到由 EM websocket 连接订阅的频道。我需要防止 @topic.receive 循环阻塞,因此创建了一个 proc 并在没有回调的情况下调用 EventMachine.defer。这将无限期地运行。这工作正常。我也可以只使用 Thread.new。
我的问题是,这是从流/队列中读取数据并将数据传递到通道的正确方法吗?还有更好的/其他方法吗?
require 'em-websocket'
require 'torquebox-messaging'
class WebsocketServer
def initialize
@channel = EM::Channel.new
@topic = TorqueBox::Messaging::Topic.new('/topics/mytopic')
end
def start
EventMachine.run do
topic_to_channel = proc do
while true
msg = @topic.receive
@channel.push msg
end
end
EventMachine.defer(topic_to_channel)
EventMachine::WebSocket.start(:host => "127.0.0.1", :port => 8081, :debug => false) do |connection|
connection.onopen do
sid = @channel.subscribe { |msg| connection.send msg }
connection.onclose do
@channel.unsubscribe(sid)
end
end
end
end
end
end
WebsocketServer.new.start