2

如何优雅地关闭 python thrift 服务器 TProcessPoolServer?我没有找到任何文档、示例或博客文章。以下是我迄今为止的经历。

我直接在命令行 ./thrift_service.py 上运行我的 thrift 服务器,而不是在主管之下。我正在使用 python 2.6 和 thrift 0.8.0。

我最初尝试过:

server = TProcessPoolServer(processor, transport, tfactory, pfactory)
try:
    server.serve()
finally:
    server.stop()

当我向父 python 进程发送 sigterm 时,我在输出中看到“终止”,该进程被终止,但它的子进程是孤立的并继续运行。

然后我偶然发现了thrift server tests,并尝试了:

import signal
def set_alarm(server):
    def clean_shutdown(signum, frame):
        for worker in server.workers:
            logging.error("Terminating worker: {0}".format(worker))
            worker.terminate()
        logging.error("Requesting server to stop()")
        try:
            server.stop()
        except (KeyboardInterrupt, SystemExit):
            pass
        except Exception as err:
            logging.exception(err)
    def logme(s, *args, **kwargs):
        logging.error(">>> {0} <<<".format(s))
        clean_shutdown(*args, **kwargs)
    signal.signal(signal.SIGALRM, clean_shutdown)
    signal.signal(signal.SIGHUP, clean_shutdown)
    signal.signal(signal.SIGINT, clean_shutdown)
    signal.signal(signal.SIGTERM, lambda x, y: logme("SIGTERM", x, y))
server = TProcessPoolServer(processor, transport, tfactory, pfactory)
set_alarm(server)
server.serve()

当我向父 python 进程发送 sigterm、sigalrm、sighup 或 sigint 时,服务器停止接受连接,但进程没有终止。

在输出中我看到:

ERROR:root:>>> SIGTERM <<<
ERROR:root:Terminating worker: <Process(Process-1, started daemon)>
ERROR:root:Terminating worker: <Process(Process-2, started daemon)>
ERROR:root:Terminating worker: <Process(Process-3, started daemon)>
ERROR:root:Terminating worker: <Process(Process-4, started daemon)>
ERROR:root:Terminating worker: <Process(Process-5, started daemon)>
ERROR:root:Requesting server to stop()

这是意料之中的,但是随后再次捕获到信号,进程不再处于启动状态,并且要求服务器停止。这部分发生了大约十次,然后就没有更多的输出了。

ERROR:root:>>> SIGTERM <<<
ERROR:root:Terminating worker: <Process(Process-1, unknown daemon)>
ERROR:root:Requesting server to stop()

有时,我会在多处理库中看到一个 AssertionError:

Traceback (most recent call last):
  File "/path/to/thrift_service.py", line 340, in clean_shutdown
    server.stop()
  File "/usr/local/lib/python2.6/dist-packages/thrift/server/TProcessPoolServer.py", line 123, in stop
    self.stopCondition.notify()
  File "/usr/lib/python2.6/multiprocessing/synchronize.py", line 223, in notify
    assert not self._wait_semaphore.acquire(False)
AssertionError
4

2 回答 2

2

我使用信号和它公开的 postForkCallback 向 python 中的 TProcessPoolServer 添加了正常关闭。一旦初始化,TProcessPoolServer 将在每个工作进程中调用您的 postForkCallback。这允许您设置信号处理程序并正常关闭。由于工作人员捕获 SystemExit 或 KeyboardInterruptException 异常,您可以为 SIGINT 设置一个处理程序,然后在完成清理后调用 sys.exit(0) ,这将导致工作人员关闭。

import signal
import sys

def setupHandlers():
    signal.signal(signal.SIGINT, handleSIGINT)
    #Optionally if you want to keep the current socket connection open and working
    #tell python to make system calls non-interruptable, which is probably what you want.
    signal.siginterrupt(signal.SIGINT, False)

def handleSIGINT(sig, frame):
     #clean up state or what ever is necessary
     sys.exit(0)

server = TProcessPoolServer(processor, transport, tfactory, pfactory)
server.setPostForkCallback(setupHandlers)

#Setup handlers in main process too
setupHandlers()

#Start server
server.start()

这样,每个生成的工作进程都会设置信号处理程序以正确处理正常关闭。在此示例中,我为主进程以及可能根据您的用例工作的工作人员设置了相同的处理程序,但如果需要,您可以轻松地为主进程定义不同的处理程序。请记住,处理程序将从每个进程的上下文中调用,因此您将无法在清理期间跨进程共享状态。

有关 signal.siginterrupt 的作用以及您可能需要它的原因的更多详细信息,请参见http://docs.python.org/library/signal.html 。

编辑:您需要使用 Crtl + C 将 SIGINT 信号发送到所有进程,或者如果它作为守护进程运行 kill -SIGINT [pids of all processes]

您可以使用 ps --ppid [parent pid] 轻松获取工作人员的 pid

于 2012-05-25T16:58:29.607 回答
0

程序启动后,我记录了主进程的进程号。然后根据ps --ppid,取回主进程的子进程,一一杀死。

我的服务的控制外壳脚本的代码:

function stop
{
    SERVER_PID=`cat logs/server.pid`
    SPIDS=`ps --ppid $SERVER_PID | awk '{if ($1!="PID") print $1}'`
    kill -9 $SERVER_PID
    for PID in $SPIDS
    do
        kill -9 $PID
    done
}
于 2021-04-13T07:34:26.713 回答