8

我正在调试从 2 个传感器收集信息的应用程序:一个网络摄像头和一个麦克风。

一般架构非常简单:

  • 主进程通过管道将消息(start、stop、get_data)发送到子进程(每个进程一个)。
  • 子进程收集数据并将其发送到主进程

子进程和主进程处于无限循环来处理命令(来自用户的主进程,来自主进程的子进程)。

它在全球范围内有效,但我无法停止子进程。

我已经记录了代码,它似乎发生了两件事:

  1. “停止”消息已发送,但未通过管道。
  2. 子进程继续发送数据和 conn.send(data) 块。

该行为显然与连接状态相关,因为不发送任何内容的子进程没有此行为。不过,我看不出如何调试/修改当前架构,这似乎是合理的。

那么,是什么导致了这种阻​​塞行为以及如何避免呢?

这是在子进程中为无限循环的每次迭代执行的代码:

    def do(self):
        while self.cnx.poll():
            msg = self.cnx.recv()
            self.queue.append(msg)
        #==
        if not self.queue:
            func_name = 'default_action'
            self.queue.append([func_name, ])
        #==
        msg             = self.queue.pop()
        func_name, args = msg[0], msg[1:]
        #==
        res = self.target.__getattribute__(func_name)(*args)
        #==
        running = func_name != 'stop'
        #==
        if res and self.send:
            assert running
            self.output_queue.append(res[0])
        if self.output_queue and running:
            self.cnx.send(self.output_queue.popleft())
        #==
        return running

更新:似乎管道不能同时在两端写入。如果将上述代码的最后几行更改为:

        if self.output_queue and running:
            if not self.cnx.poll(): 
                self.cnx.send(self.output_queue.popleft())

尽管默认情况下 Pipe 被记录为全双工并且此行为根本没有记录,但问题仍然存在。我一定是误会了什么。请赐教我!

更新 2:为了清楚起见,在这种情况下没有关闭连接。描述事件的顺序:

  • 主进程发送消息(“停止”)(它在发送消息之前清空连接)
  • 主进程进入一个(无限)循环,当子进程终止时该循环停止。
  • 同时,子进程在发送中被阻塞,永远不会收到消息。
4

1 回答 1

4

全双工multiprocessing.Pipe实现为socketpair(). .send与套接字交谈时,调用可能会因所有正常原因而阻塞。根据您的描述,我认为您的阅读器很可能Pipe已退出阅读,并且数据已在内核的缓冲区中累积到您的.send阻塞点。

如果您明确.close接收方SIGPIPE,当您尝试.send. 如果您的接收连接超出范围,这可能会自动发生。您可以通过更加小心不要将引用(直接或间接)存储到接收端来解决问题,以便在该线程消失时释放它。

阻塞的简单演示.send

import multiprocessing

a, b = multiprocessing.Pipe()

while True:
    print "send!"
    a.send("hello world")

现在请注意,一段时间后它会退出打印“发送!”

于 2012-09-05T01:20:33.897 回答