1

我有一些代码正在监视其他一些更改的文件,我想做的是启动使用 zeromq 和不同套接字的代码,我现在这样做的方式似乎导致断言在 libzmq 的某个地方失败,因为我可能正在重用同一个插座。我如何确保当我从监视器类创建一个新进程时,上下文不会被重用?这就是我认为正在发生的事情,如果您能说我还有其他愚蠢之处,请告知。这是一些代码:

import zmq
from zmq.eventloop import ioloop
from  zmq.eventloop.zmqstream  import ZMQStream
class Monitor(object):
    def __init(self)
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)
        self.socket.connect("tcp//127.0.0.1:5055")
        self.stream =  ZMQStream(self._socket)
        self.stream.on_recv(self.somefunc)

    def initialize(self,id)
        self._id = id

    def somefunc(self, something)
        """work here and send back results if any """
        import json
        jdecoded = json.loads(something)
        if self_id == jdecoded['_id']
           """ good im the right monitor for you """
           work = jdecoded['message']
           results  = algorithm (work)
           self.socket.send(json.dumps(results))
        else:
           """let some other process deal with it, not mine """
           pass

 class Prefect(object):
    def __init(self, id)
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)
        self.socket.bind("tcp//127.0.0.1:5055")
        self.stream =  ZMQStream(self._socket)
        self.stream.on_recv(self.check_if)
        self._id = id
        self.monitors = []
    def check_if(self,message):
        """find out from message's id whether we have
            started a proces for it previously"""
        import json
        jdecoded = json.loads(message)
        this_id = jdecoded['_id']
        if this_id in self.monitors:
            pass
        else:
            """start new process for it should have its won socket """
            new = Monitor()
            import Process
            newp = Process(target=new.initialize,args=(this_id) )
            newp.start()
            self.monitors.append(this_id) ## ensure its remembered

发生的事情是我想要所有的监控进程和一个单一的高级进程在同一个端口上监听,所以当高级工程师看到一个它没有看到它的请求时,它会为它启动一个进程,所有存在的进程可能也应该监听但是忽略不适合他们的消息。就目前而言,如果我这样做,我可能会遇到一些崩溃,这可能与某些东西对同一个 zmq 套接字的并发访问有关(我尝试过 threading.thread,仍然崩溃)我在某处读到不同线程对 zmq 套接字的并发访问是不可能的. 我将如何确保新进程获得自己的 zmq 套接字?

编辑:我的应用程序中的主要交易是请求通过 zmq 套接字进入,并且正在侦听的进程通过以下方式对消息作出反应:

1. If its directed at that process judged by the _id field, do some reading on a file and reply since one of the monitors match the messages _id, if none match, then:
2  If the messages _id files is not recognized, all monitors ignore it but the Prefect creates a process to handle that _id and all future messages to that id.
3. I want all the messages to be seen by the monitor processes as well as the prefect process, seems that seems easiest, 
4. All the messages are very small, avarage ~4096 bytes.   
5. The monitor does some non-blocking read and for each ioloop it sends what it has found out

more-edit=> 并且完美进程现在绑定,它将接收消息并回显它们,以便监视器可以看到它们。这就是我的想法,作为架构,但它不是最终的。.

所有消息都通过浏览器从远程用户到达,让服务器知道客户端想要什么,服务器通过 zmq 将消息发送到后端(我没有显示这个,但并不难)所以在生产中它们可能不会绑定/连接到本地主机。我选择了 DEALER,因为它允许任意方向的异步/无限消息(见第 5 点。)并且 DEALER 可以与 DEALER 绑定,并且初始请求/回复可以从任何一方到达。另一个可以做到这一点的可能是 DEALER/ROUTER。

4

1 回答 1

2

您是正确的,您不能在分叉边界上继续使用相同的套接字(多处理使用分叉)。一般来说,这意味着您不想创建将在分叉进程中使用的套接字,直到子进程启动之后。由于在您的情况下,套接字是 Monitor 对象的一个​​属性,因此您根本不想在主进程中创建 Monitor。看起来像这样:

def start_monitor(this_id):
    monitor = Monitor()
    monitor.initialize(this_id)
    # run the eventloop, or this will return immediately and destroy the monitor

... inside Prefect.check_if():

    proc = Process(target=start_monitor, args=(this_id,))
    proc.start()
    self.monitors.append(this_id)

而不是您的示例,子进程唯一要做的就是分配一个 ID,然后终止该进程,最终没有任何效果。

于 2013-04-13T20:49:10.847 回答