我正在调试从 2 个传感器收集信息的应用程序:一个网络摄像头和一个麦克风。
一般架构非常简单:
- 主进程通过管道将消息(start、stop、get_data)发送到子进程(每个进程一个)。
- 子进程收集数据并将其发送到主进程
子进程和主进程处于无限循环来处理命令(来自用户的主进程,来自主进程的子进程)。
它在全球范围内有效,但我无法停止子进程。
我已经记录了代码,它似乎发生了两件事:
- “停止”消息已发送,但未通过管道。
- 子进程继续发送数据和 conn.send(data) 块。
该行为显然与连接状态相关,因为不发送任何内容的子进程没有此行为。不过,我看不出如何调试/修改当前架构,这似乎是合理的。
那么,是什么导致了这种阻塞行为以及如何避免呢?
这是在子进程中为无限循环的每次迭代执行的代码:
def do(self):
while self.cnx.poll():
msg = self.cnx.recv()
self.queue.append(msg)
#==
if not self.queue:
func_name = 'default_action'
self.queue.append([func_name, ])
#==
msg = self.queue.pop()
func_name, args = msg[0], msg[1:]
#==
res = self.target.__getattribute__(func_name)(*args)
#==
running = func_name != 'stop'
#==
if res and self.send:
assert running
self.output_queue.append(res[0])
if self.output_queue and running:
self.cnx.send(self.output_queue.popleft())
#==
return running
更新:似乎管道不能同时在两端写入。如果将上述代码的最后几行更改为:
if self.output_queue and running:
if not self.cnx.poll():
self.cnx.send(self.output_queue.popleft())
尽管默认情况下 Pipe 被记录为全双工并且此行为根本没有记录,但问题仍然存在。我一定是误会了什么。请赐教我!
更新 2:为了清楚起见,在这种情况下没有关闭连接。描述事件的顺序:
- 主进程发送消息(“停止”)(它在发送消息之前清空连接)
- 主进程进入一个(无限)循环,当子进程终止时该循环停止。
- 同时,子进程在发送中被阻塞,永远不会收到消息。