4

使用0mq在进程之间建立双向通信的最正确方法是什么?我需要创建几个后台进程来等待来自主进程的命令,执行一些计算并将结果返回给主进程。

4

2 回答 2

7

There are a few ways to do this. The most straight forward approach might be to use REQ/REP sockets. Each background process/worker would have a REP socket, and you would use a REQ socket to communicate with them:

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

You'd have to connect to each worker to send them a message, and get back results:

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

Another approach would be to use PUB/SUB to fire off messages to the workers and PUSH/PULL to harvest results:

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

To broadcast a command to a specific worker, you'd do something like:

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

Pull in results:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result
于 2011-08-24T17:24:01.077 回答
3

考虑使用请求回复代理,但将 REQ 套接字交换到 DEALER。DEALER 不会阻塞发送,并会自动对您的工作人员的流量进行负载平衡。

在图片Client中将是你的main process并且Service A/B/C是你的background processes (workers)Main process应该绑定到端点。Workers应该连接到主进程的端点以接收工作项。

main process保留工作项目列表和发送时间。如果一段时间内没有答案,请再次重新发送工作项,因为worker可能已经死了。

请求回复经纪人

于 2011-08-24T17:17:35.297 回答