0

我正在使用 inproc zmq 套接字进行多线程数据共享。我有多个线程客户端到一个主线程;每个客户端都有一个 PUSH 套接字,主节点有一个 PULL 套接字,作为所有客户端的接收器。在大多数情况下,每个客户端都是独立的,但我确实有一些适度的订单要求,因为一个客户端线程相当特殊。

这是一些代码,说明了我的问题的变体:

import threading
import zmq

context = zmq.Context()

pull = context.socket(zmq.PULL)
pull.bind('inproc://my-socket')

def slave():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send('>>> '+x)
    #push.send('END')
    push.close()

def master():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send(x)

    x = threading.Thread(target=slave)
    x.start()

    while x.is_alive():
        pass

    push.send('END')
    push.close()

thread = threading.Thread(target=master)
thread.start()

while True:
    if pull.poll():
        x = pull.recv()
        if x == 'END':
            print 'END - exiting'
            break
        print x

从属线程完全在主线程发送它的主要有效负载之后启动的事实使我期望所有主线程的数据都在从属的数据之前。然而,情况并非总是如此。考虑以下输出(实际上,顺序并不一致,但我确实得到了这个顺序):

$ python zmq_threads.py 
one
two
>>> one
three
>>> two
END - exiting

我希望以下订单可靠,我相信这个订单是由主/从安排强制发送的

$ python zmq_threads.py 
one
two
three
>>> one
>>> two
>>> three
END - exiting

考虑一下,我可以看到多个套接字客户端不会承诺这种同步。但是,感觉我应该能够以某种方式刷新某些东西以强制执行 recv 的顺序(尤其是使用 inproc 传输)。有任何想法吗?

4

1 回答 1

0

我已经通过让从线程使用不同的套接字来解决这个问题,该套接字在主线程中接收并发送到主程序。理论上,我认为这里可以使用 STREAMER 设备将 inproc://my-socket-2 连接到 inproc://my-socket,但我认为我没有更多的顺序保证我只是在其中添加另一个线程主持 STREAMER 的混音。

这是我的问题代码的另一种版本,使用了说明的技术。在所有情况下,我都觉得需要一个“END”信标相当烦人,但我不知道是否有更好的 zmq/thread 机制。

import threading
import zmq

context = zmq.Context()

pull = context.socket(zmq.PULL)
pull.bind('inproc://my-socket')

def slave():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket-2')

    for x in 'one two three'.split(' '):
        push.send('>>> '+x)
    push.send('END')
    push.close()

def master():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send(x)

    slaved_pull = context.socket(zmq.PULL)
    slaved_pull.bind('inproc://my-socket-2')
    x = threading.Thread(target=slave)
    x.start()

    while True:
        if slaved_pull.poll():
            x = slaved_pull.recv()
            if x == 'END':
                #print 'END - exiting'
                break
            push.send(x)

    push.send('END')
    push.close()

thread = threading.Thread(target=master)
thread.start()

while True:
    if pull.poll():
        x = pull.recv()
        if x == 'END':
            print 'END - exiting'
            break
        print x
于 2013-08-31T15:22:29.570 回答