我正在尝试模拟使用扭曲运行的应用程序网络。作为我模拟的一部分,我想同步某些事件并能够为每个进程提供大量数据。我决定使用多处理事件和队列。但是,我的进程正在挂起。
我写了下面的示例代码来说明问题。具体来说,(大约 95% 的时间在我的沙桥机器上)“run_in_thread”函数完成,但是直到我按下 Ctrl-C 后才调用“print_done”回调。
此外,我可以更改示例代码中的一些内容以使其工作更可靠,例如:减少衍生进程的数量、从 reactor_ready 调用 self.ready.set 或更改 deferLater 的延迟。
我猜在扭曲反应器和阻塞多处理调用(如 Queue.get() 或 Event.wait() 之间存在竞争条件?
我遇到的问题到底是什么?我的代码中是否有我遗漏的错误?我可以解决这个问题还是扭曲与多处理事件/队列不兼容?
其次,像 spawnProcess 或 Ampoule 这样的东西会是推荐的替代品吗?(如Mix Python Twisted with multiprocessing? 中所建议的那样?)
编辑(根据要求):
我尝试过 glib2reactor selectreactor、pollreactor 和 epollreactor 的所有反应器都遇到了问题。epollreactor 似乎给出了最好的结果,并且对于下面给出的示例似乎工作正常,但在我的应用程序中仍然给我同样(或类似)的问题。我会继续调查。
我正在运行 Gentoo Linux 内核 3.3 和 3.4、python 2.7,并且我尝试过 Twisted 10.2.0、11.0.0、11.1.0、12.0.0 和 12.1.0。
除了我的沙桥机器外,我在双核 AMD 机器上也看到了同样的问题。
#!/usr/bin/python
# -*- coding: utf-8 *-*
from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task
from multiprocessing import Process
from multiprocessing import Event
class TestA(Process):
def __init__(self):
super(TestA, self).__init__()
self.ready = Event()
self.ready.clear()
self.start()
def run(self):
reactor.callWhenRunning(self.reactor_ready)
reactor.run()
def reactor_ready(self, *args):
task.deferLater(reactor, 1, self.node_ready)
return args
def node_ready(self, *args):
print 'node_ready'
self.ready.set()
return args
def reactor_running():
print 'reactor_running'
df = threads.deferToThread(run_in_thread)
df.addCallback(print_done)
def run_in_thread():
print 'run_in_thread'
for n in processes:
n.ready.wait()
def print_done(dfResult=None):
print 'print_done'
reactor.stop()
if __name__ == '__main__':
processes = [TestA() for i in range(8)]
reactor.callWhenRunning(reactor_running)
reactor.run()