您总是需要阻塞一段时间才能同步事物。您实际上可以将请求发送到一组工作人员,并在收到响应时 - 如果它不是后续响应,则对其进行缓冲。一个简单的工作流程可以用一种伪语言描述如下:
socket receiver; # zmq.PULL
socket workers; # zmq.DEALER, the worker thread socket is started as zmq.DEALER too.
poller = poller(receiver, workers);
next_id_req = incr()
out_queue = queue;
out_queue.last_id = next_id_req
buffer = sorted_queue;
sock = poller.poll()
if sock is receiver:
packet_N = receiver.recv()
# send N for processing
worker.send(packet_N, ++next_id_req)
else if sock is workers:
# get a processed response Func(N)
func_N_response, id = workers.recv()
if out_queue.last_id != id-1:
# not subsequent id, buffer it
buffer.push(id, func_N_rseponse)
else:
# in order, push to out queue
out_queue.push(id, func_N_response)
# also consume all buffered subsequent items
while (out_queue.last_id == buffer.min_id() - 1):
id, buffered_N_resp = buffer.pop()
out_queue.push(id, buffered_N_resp)
但是问题来了,如果数据包在处理线程(工作池)中丢失会发生什么。您可以在一定超时后跳过它(将缓冲区刷新到输出队列中),并继续填充输出队列,然后当数据包稍后到来时重新排序,如果有的话。