0

我有以下问题正在乞求 zmq 解决方案。我有一个时间序列数据:

A,B,C,D,E,...

我需要在每个点上执行一个操作 Func。

通过 zmq 使用多个 worker 并行化任务是很有意义的。但是,让我感到困惑的是如何同步结果,即结果应该按照输入数据的输入方式按时间顺序排列。所以最终结果应该如下所示:

Func(A), Func(B), Func(C), Func(D),...

我还应该指出完成的时间,比如说,Func(A) 将与 Func(B) 略有不同。这可能需要我阻止一段时间。

任何建议将不胜感激。

4

1 回答 1

1

您总是需要阻塞一段时间才能同步事物。您实际上可以将请求发送到一组工作人员,并在收到响应时 - 如果它不是后续响应,则对其进行缓冲。一个简单的工作流程可以用一种伪语言描述如下:

socket receiver; # zmq.PULL
socket workers; # zmq.DEALER, the worker thread socket is started as zmq.DEALER too.
poller = poller(receiver, workers);

next_id_req = incr()
out_queue = queue;
out_queue.last_id = next_id_req
buffer = sorted_queue;

sock = poller.poll()
if sock is receiver:
    packet_N = receiver.recv()
    # send N for processing
    worker.send(packet_N, ++next_id_req)

else if sock is workers:
   # get a processed response Func(N)
   func_N_response, id = workers.recv()
   if out_queue.last_id != id-1:
       # not subsequent id, buffer it
       buffer.push(id, func_N_rseponse)
   else:
       # in order, push to out queue
       out_queue.push(id, func_N_response)

       # also consume all buffered subsequent items
       while (out_queue.last_id == buffer.min_id() - 1):
           id, buffered_N_resp = buffer.pop()
           out_queue.push(id, buffered_N_resp)

但是问题来了,如果数据包在处理线程(工作池)中丢失会发生什么。您可以在一定超时后跳过它(将缓冲区刷新到输出队列中),并继续填充输出队列,然后当数据包稍后到来时重新排序,如果有的话。

于 2013-12-16T20:46:26.180 回答