如何在基于 eventmachine 的 http 服务器中处理超时?我基本上将http请求信息放在处理它的队列中,然后处理可能会调用回调函数,也可能不会。我可以设置超时时间,但我还没有弄清楚如何添加超时处理程序或超时回调。
我浏览了这些文档,但没有设法从中收集到任何有用的信息。将逻辑放入 unbind 方法显然不起作用,因为在调用 unbind 时请求已完成,并且在回调创建代码旁边添加 EM::error_handler 也不起作用。
我想捕捉超时事件并在超时事件上返回特定的 json。
这是我的代码——一个 HTTP 请求处理程序
class HTTPRequestHandler < EventMachine::Connection
def initialize(s,q,h)
@tcpserver = s
@queue = q
@callback_hash = h
self.comm_inactivity_timeout = API_REQUEST_TIMEOUT
end
def post_init
@parser = RequestParser.new
end
def receive_data(data)
handle_http_request if @parser.parse(data)
end
def parse_query_parms(query_str)
begin
rethash = {}
query_arr = query_str.split(/&/)
query_arr.each { |element|
e_arr = element.split(/\=/)
rethash[e_arr[0]] = e_arr[1]
}
return rethash
rescue
return nil
end
end
def handle_http_request
result = parse_query_parms(@parser.env["QUERY_STRING"]) # hash
if result
if result.has_key?('id') and result.has_key?('rid') and result.has_key?('json')
puts result
# Callback to handle this
cb = EM.Callback{ |rid,rtime,msg|
data = "{\"rid\":\"#{rid}\",\"rtime\":\"#{rtime}\",\"msg\":#{msg}}"
send_data("HTTP/1.1 200 OK\r\n")
send_data("Content-Type: application/json\r\n")
send_data("Content-Length: #{data.bytesize}\r\n")
send_data("\r\n")
send_data(data)
close_connection_after_writing
}
# Add callback to hash
@callback_hash[result['rid']] = cb
# Unencode jsonin url
json_from_api = result['json']
json_from_api = URI.decode(json_from_api)
# Push request onto queue
qreq=QueuedRequest.new(result['id'],json_from_api)
@queue.push(qreq)
else
data = "{\"success\":\"false\",\"response\":\"request needs id, rid, json parameters\"}"
send_data("HTTP/1.1 200 OK\r\n")
send_data("Content-Type: application/json\r\n")
send_data("Content-Length: #{data.bytesize}\r\n")
send_data("\r\n")
send_data("#{data}")
close_connection_after_writing
end
else
data = "{\"success\":\"false\",\"response\":\"unable to parse parameters\"}"
send_data("HTTP/1.1 200 OK\r\n")
send_data("Content-Type: application/json\r\n")
send_data("Content-Length: #{data.bytesize}\r\n")
send_data("\r\n")
send_data("#{data}")
close_connection_after_writing
end
end
end
我们初始化所有内容并处理队列的主循环:
EM.synchrony do
h = {} # map of rids -> callbacks for requests
# Intialize TCP and HTTP Servers
q = EM::Queue.new # Queue of messages from HTTP Server
s = TCPProxyServer.new(h)
EM.start_server(LISTEN_HOST_CLIENT, LISTEN_PORT_API, HTTPRequestHandler, s, q, h)
s.start
puts "Server starting (http and tcp)"
# process queue of messages coming in from API (recursive)
process_queue = Proc.new do |qreq|
# Our functions
@operation = lambda do
puts qreq
begin
# Send data to channel
if s.connections_plug[qreq.id]
s.connections_plug[qreq.id].send_data(qreq.json)
else
return "unable to find id:#{qreq.id} in connection"
end
rescue Exception=>e
puts "Unable to send process queued request! #{e}"
end
end
@callback = lambda { |result| }
EM::defer(@operation,@callback)
EM.next_tick{ q.pop(&process_queue) }
end
q.pop(&process_queue)
end