7

所以,我有一个使用 Twisted + Stomper 作为 STOMP 客户端的应用程序,它将工作分配给 multiprocessing.Pool 的工作人员。

当我只使用 python 脚本启动它时,这似乎工作正常,它(简化)看起来像这样:

# stompclient.py

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()

当它被打包用于部署时,我想我会利用 twistd 脚本并从 tac 文件运行它。

这是我看起来非常相似的 tac 文件:

# stompclient.tac

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination

application = service.Application('myapp')

service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)

为了说明起见,我折叠或更改了一些细节;希望它们不是问题的本质。例如,我的应用程序有一个插件系统,池由一个单独的方法初始化,然后使用 pool.apply_async() 将工作委托给池,传递我的插件的一个 process() 方法。

因此,如果我运行脚本 (stompclient.py),一切都会按预期工作。

如果我在非守护程序模式(-n)下运行twist,它似乎也可以正常工作:

twistd -noy stompclient.tac

但是,当我在守护程序模式下运行时它不起作用:

twistd -oy stompclient.tac

该应用程序似乎可以正常启动,但是当它尝试分叉工作时,它只是挂起。通过“挂起”,我的意思是似乎从未要求子进程做任何事情,而父进程(称为 pool.apply_async())只是坐在那里等待响应返回。

我确信我用 Twisted + 多处理做了一些愚蠢的事情,但我真的希望有人可以向我解释我的方法中的缺陷。

提前致谢!

4

2 回答 2

12

由于您的工作调用和非工作调用之间的区别只是“-n”选项,因此问题似乎很可能是由守护进程(“-n”阻止发生)引起的。

在 POSIX 上,守护进程涉及的步骤之一是分叉并让父级退出。其中,这会导致您的代码在与评估 .tac 文件的进程不同的进程中运行。这也重新安排了在 .tac 文件中启动的进程的子/父关系 - 就像您的多处理进程池一样。

多处理池的进程从您启动的 twistd 进程的父进程开始。但是,当该进程作为守护进程的一部分退出时,它们的父进程将成为系统初始化进程。这可能会导致一些问题,尽管可能不是您描述的悬挂问题。可能还有其他类似的低级实现细节,这些细节通常允许多处理模块工作,但会被守护进程中断。

幸运的是,避免这种奇怪的交互应该很简单。Twisted 的服务 API 允许您在守护进程完成后运行代码。如果您使用这些 API,那么您可以将多处理模块的进程池的初始化延迟到守护进程之后,并有望避免该问题。这是一个可能看起来像的示例:

from twisted.application.service import Service

class MultiprocessingService(Service):
    def startService(self):
        self.pool = multiprocessing.Pool(processes=processes)

MultiprocessingService().setServiceParent(application)

现在,另外,您还可能会遇到与清理多处理模块的子进程相关的问题,或者可能与使用 Twisted 的进程创建 API、reactor.spawnProcess 创建的进程有关的问题。这是因为正确处理子进程的一部分通常涉及处理 SIGCHLD 信号。但是,Twisted 和 multiprocessing 在这方面不会合作,因此其中一个将收到所有孩子退出的通知,而另一个永远不会收到通知。如果您根本不使用 Twisted 的 API 来创建子进程,那么这对您来说可能没问题 - 但您可能需要检查以确保多处理模块尝试安装的任何信号处理程序实际上“获胜”并且没有得到被 Twisted 自己的处理程序取代。

于 2009-09-24T14:58:46.103 回答
0

一个可能的想法给你...

当以守护程序模式运行时,twistd 将关闭标准输入、标准输出和标准错误。您的客户对这些内容进行读取或写入吗?

于 2009-09-24T11:32:44.710 回答