9

我想编写一个执行以下操作的python脚本(称为parent):

(1)定义一个多维numpy数组

(2) forks 10 种不同的python脚本(称它们为孩子)。它们中的每一个都必须能够在任何单个时间点从(1)获取数组read的内容(只要它们还活着)。numpy

(3)每个脚本都会做自己的工作(孩子们不要互相分享任何信息)

(4)在任何时间点,脚本必须能够接受来自其所有脚本的消息。这些消息将由父级解析并导致(1)numpy中的数组发生更改。


python在环境中工作时,我该如何处理Linux?我想使用zeroMQ并让父母成为单个订阅者,而孩子们都将成为发布者;这有意义还是有更好的方法?

另外,如何让所有级连续读取numpy级定义的数组的内容?

4

3 回答 3

18

sub频道不一定是要绑定的频道,因此您可以让订阅者绑定,并且每个子pub频道都可以连接到该频道并发送他们的消息。在这种特殊情况下,我认为该multiprocessing模块更适合,但我认为它值得一提:

import zmq
import threading

# So that you can copy-and-paste this into an interactive session, I'm
# using threading, but obviously that's not what you'd use

# I'm the subscriber that multiple clients are writing to
def parent():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, 'Child:')
    # Even though I'm the subscriber, I'm allowed to get this party 
    # started with `bind`
    socket.bind('tcp://127.0.0.1:5000')

    # I expect 50 messages
    for i in range(50):
        print 'Parent received: %s' % socket.recv()

# I'm a child publisher
def child(number):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    # And even though I'm the publisher, I can do the connecting rather
    # than the binding
    socket.connect('tcp://127.0.0.1:5000')

    for data in range(5):
        socket.send('Child: %i %i' % (number, data))
    socket.close()

threads = [threading.Thread(target=parent)] + [threading.Thread(target=child, args=(i,)) for i in range(10)]
for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

特别是,文档的核心消息传递模式部分讨论了对于模式,任何一方都可以绑定(和另一方连接)的事实。

于 2012-05-22T03:37:18.200 回答
4

我认为使用 PUSH/PULL 插座更有意义,因为您有一个标准的Ventilator - Workers - Sink场景,除了 Ventilator 和 Sink 是相同的过程。

另外,考虑使用多处理模块而不是 ZeroMQ。可能会容易一些。

于 2011-07-14T21:46:18.903 回答
-2

在 ZeroMQ 中,每个端口只能有一个发布者。唯一(丑陋的)解决方法是在不同的端口上启动每个子 PUB 套接字,并让父级监听所有这些端口。

但是 0MQ 上描述的管道模式,用户指南是一种更好的方法。

于 2011-07-26T14:07:32.897 回答