2

嗨,我在 Python 类中包装一些 ZMQ 拉客户端时遇到了一些问题。这些类通过多处理模块在子进程中实例化和调用。当客户端是函数时一切正常,但当它们是类时 poller.poll() 挂起。

下面的代码有两个版本:一个有效,另一个无效。为什么?

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)    


class Client:
    def __init__(self,port_push, port_sub):
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://localhost:%s" % port_push)
        print "Connected to server with port %s" % port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://localhost:%s" % port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % port_sub
        # Initialize poll set


    def __call__(self):
        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            print "hello"
            socks = dict(poller.poll())
            print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

                if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                    string = self.socket_sub.recv()
                    topic, messagedata = string.split()
                    print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    #~ Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()
4

1 回答 1

2

Edit1:这不太正确......给我一些时间来纠正它......

我认为您可能以错误的方式调用 Client 类。我不是这方面的专家,但我认为您的客户端应该从 Process 子类化,然后使用 .start() 函数运行。因此,像这样定义您的 Client 类:

class Client(Process):
    def __init__(self, port_push, port_sub):
        (...) # your class init code here...make sure indentation is correct

然后在运行服务器的最后,创建一个 Client 类的实例并像这样启动它:

client_class = Client(port_push, port_sub)
client_class.start()

Edit2:这是对我有用的 fccoelho 代码的编辑版本。

最大的问题似乎是 ZMQ 初始化的东西需要在__call__方法中完成,而不是在__init__. 我怀疑这是由于在多处理中如何分配内存,因为该__init__函数将在父进程中完成,而__call__功能是在具有单独内存空间的子进程中完成的。显然 ZMQ 不喜欢这样。我还添加了一些等待时间,以防止客户端在服务器准备好之前连接到服务器,并防止服务器在客户端订阅之前发送消息。还使用 127.0.0.1 而不是 localhost (由于某种原因,我的计算机不喜欢 localhost)。还删除了客户端轮询调用周围烦人的打印消息,并修复了客户端检查 pubsub 套接字上的轮询结果的缩进问题。

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://127.0.0.1:%s" % port)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            print 'Push server sent "Exit" signal'
            break
        time.sleep(0.4) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:%s" % port)
    socket.setsockopt(zmq.HWM, 1000)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(0.4)    


class Client:
    def __init__(self,port_push, port_sub):
        self.port_push = port_push
        self.port_sub = port_sub
        # Initialize poll set

    def __call__(self):
        time.sleep(0.5)
        print 'hello from class client!'
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://127.0.0.1:%s" % self.port_push)
        print "Connected to server with port %s" % self.port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://127.0.0.1:%s" % self.port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % self.port_sub

        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            # print "hello"
            socks = dict(poller.poll())
            # print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

            if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                string = self.socket_sub.recv()
                topic, messagedata = string.split()
                print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    print 'hello from function client!'
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://127.0.0.1:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://127.0.0.1:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll(1000))
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    # Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()

最后,这是一个更简洁的多进程 pubsub 实现,它非常简单,但更清楚地展示了事情:

import zmq
from multiprocessing import Process
import time

class ServerPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.pub = self.context.socket(zmq.PUB)
        self.pub.bind('tcp://127.0.0.1:%d' % self.port)
        self.pub.setsockopt(zmq.HWM, 1000)

        time.sleep(1)

        end = False
        for i in range(self.n):
            print 'SRV: sending message %d' % i
            self.pub.send('Message %d' % i)
            print 'SRV: message %d sent' % i
            time.sleep(0.2)

        self.pub.close()

class ClientPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.sub = self.context.socket(zmq.SUB)
        self.sub.connect('tcp://127.0.0.1:%d' % self.port)
        self.sub.setsockopt(zmq.SUBSCRIBE, '')
        self.poller = zmq.Poller()
        self.poller.register(self.sub, zmq.POLLIN)

        end = False
        count = 0
        while count < self.n:
            ready = dict(self.poller.poll(0))
            if self.sub in ready and ready[self.sub] == zmq.POLLIN:
                msg = self.sub.recv()
                print 'CLI: received message "%s"' % msg
                count += 1

        self.sub.close()

if __name__ == "__main__":
    port = 5000
    n = 10
    server = ServerPubSub(port, n)
    client = ClientPubSub(port, n)

    server.start()
    client.start()

    server.join()
    client.join()
于 2012-03-19T13:21:36.183 回答