我正在使用 inproc zmq 套接字进行多线程数据共享。我有多个线程客户端到一个主线程;每个客户端都有一个 PUSH 套接字,主节点有一个 PULL 套接字,作为所有客户端的接收器。在大多数情况下,每个客户端都是独立的,但我确实有一些适度的订单要求,因为一个客户端线程相当特殊。
这是一些代码,说明了我的问题的变体:
import threading
import zmq
context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind('inproc://my-socket')
def slave():
global context
push = context.socket(zmq.PUSH)
push.connect('inproc://my-socket')
for x in 'one two three'.split(' '):
push.send('>>> '+x)
#push.send('END')
push.close()
def master():
global context
push = context.socket(zmq.PUSH)
push.connect('inproc://my-socket')
for x in 'one two three'.split(' '):
push.send(x)
x = threading.Thread(target=slave)
x.start()
while x.is_alive():
pass
push.send('END')
push.close()
thread = threading.Thread(target=master)
thread.start()
while True:
if pull.poll():
x = pull.recv()
if x == 'END':
print 'END - exiting'
break
print x
从属线程完全在主线程发送它的主要有效负载之后启动的事实使我期望所有主线程的数据都在从属的数据之前。然而,情况并非总是如此。考虑以下输出(实际上,顺序并不一致,但我确实得到了这个顺序):
$ python zmq_threads.py
one
two
>>> one
three
>>> two
END - exiting
我希望以下订单可靠,我相信这个订单是由主/从安排强制发送的
$ python zmq_threads.py
one
two
three
>>> one
>>> two
>>> three
END - exiting
考虑一下,我可以看到多个套接字客户端不会承诺这种同步。但是,感觉我应该能够以某种方式刷新某些东西以强制执行 recv 的顺序(尤其是使用 inproc 传输)。有任何想法吗?