6

我正在尝试模拟使用扭曲运行的应用程序网络。作为我模拟的一部分,我想同步某些事件并能够为每个进程提供大量数据。我决定使用多处理事件和队列。但是,我的进程正在挂起。

我写了下面的示例代码来说明问题。具体来说,(大约 95% 的时间在我的沙桥机器上)“run_in_thread”函数完成,但是直到我按下 Ctrl-C 后才调用“print_done”回调。

此外,我可以更改示例代码中的一些内容以使其工作更可靠,例如:减少衍生进程的数量、从 reactor_ready 调用 self.ready.set 或更改 deferLater 的延迟。

我猜在扭曲反应器和阻塞多处理调用(如 Queue.get() 或 Event.wait() 之间存在竞争条件?

我遇到的问题到底是什么?我的代码中是否有我遗漏的错误?我可以解决这个问题还是扭曲与多处理事件/队列不兼容?

其次,像 spawnProcess 或 Ampoule 这样的东西会是推荐的替代品吗?(如Mix Python Twisted with multiprocessing? 中所建议的那样?

编辑(根据要求):

我尝试过 glib2reactor selectreactor、pollreactor 和 epollreactor 的所有反应器都遇到了问题。epollreactor 似乎给出了最好的结果,并且对于下面给出的示例似乎工作正常,但在我的应用程序中仍然给我同样(或类似)的问题。我会继续调查。

我正在运行 Gentoo Linux 内核 3.3 和 3.4、python 2.7,并且我尝试过 Twisted 10.2.0、11.0.0、11.1.0、12.0.0 和 12.1.0。

除了我的沙桥机器外,我在双核 AMD 机器上也看到了同样的问题。

#!/usr/bin/python
# -*- coding: utf-8 *-*

from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task

from multiprocessing import Process
from multiprocessing import Event

class TestA(Process):
    def __init__(self):
        super(TestA, self).__init__()
        self.ready = Event()
        self.ready.clear()
        self.start()

    def run(self):
        reactor.callWhenRunning(self.reactor_ready)
        reactor.run()

    def reactor_ready(self, *args):
        task.deferLater(reactor, 1, self.node_ready)
        return args

    def node_ready(self, *args):
        print 'node_ready'
        self.ready.set()
        return args

def reactor_running():
    print 'reactor_running'
    df = threads.deferToThread(run_in_thread)
    df.addCallback(print_done)

def run_in_thread():
    print 'run_in_thread'
    for n in processes:
        n.ready.wait()

def print_done(dfResult=None):
    print 'print_done'
    reactor.stop()

if __name__ == '__main__':
    processes = [TestA() for i in range(8)]
    reactor.callWhenRunning(reactor_running)
    reactor.run()
4

1 回答 1

10

简短的回答是肯定的,Twisted 和 multiprocessing 彼此不兼容,您无法像尝试那样可靠地使用它们。

在所有 POSIX 平台上,子进程管理与处理密切相关SIGCHLD。POSIX 信号处理程序是进程全局的,每种信号类型只能有一个。

Twisted 和 stdlibmultiprocessing不能同时SIGCHLD安装处理程序。只有其中一个可以。这意味着其中只有一个可以可靠地管理子进程。您的示例应用程序无法控制其中哪些将赢得该能力,因此我预计其行为会因该事实而产生一些不确定性。

但是,您的示例更直接的问题是您在父进程中加载​​ Twisted,然后使用multiprocessingfork而不是执行所有子进程。Twisted 不支持这样使用。如果你 fork 然后 exec,没有问题。但是,缺少新进程(可能是使用 Twisted 的 Python 进程)的 exec 会导致 Twisted 无法解释的各种额外共享状态。在您的特定情况下,导致此问题的共享状态是用于实现deferToThread. 由于父进程和所有子进程共享 fd,当父进程试图唤醒主线程以传递deferToThread调用结果时,它很可能会唤醒其中一个子进程. 子进程没有什么有用的事情可做,所以这只是浪费时间。同时,父线程中的主线程永远不会醒来,也不会注意到您的线程任务已完成。

您可以通过在创建子进程之前不加载任何 Twisted 来避免此问题。就 Twisted 而言,这会将您的使用变成一个单进程用例(在每个进程中,它最初会被加载,然后该进程根本不会继续分叉,因此不存在如何分叉和扭曲互动了)。这意味着在创建子进程之前甚至不导入 Twisted。

当然,这只对 Twisted 有帮助。您使用的任何其他库都可能遇到类似的问题(您提到了 glib2,这是另一个库的一个很好的例子,如果您尝试像这样使用它会完全窒息)。

我强烈建议不要使用该multiprocessing模块。相反,使用任何涉及 forkexec 的多进程方法,而不是单独使用 fork。安瓿属于这一类。

于 2012-07-01T15:50:15.433 回答