我有一个包含 4 个节点和一个主服务器的集群。Master 调度的作业可能需要 30 秒到 15 分钟才能结束。
节点正在监听 aSocketServer.TCPServer
并且在主节点中,我打开一个连接并等待作业结束。
def run(nodes, args):
pool = multiprocessing.Pool(len(nodes))
return pool.map(load_job, zip(nodes, args))
该load_job
函数发送数据,socket.sendall
然后立即使用socket.recv
(数据需要很长时间才能到达)。
该程序运行良好,直到运行大约 200 或 300 个这些作业。当它中断时,socket.recv
接收一个空字符串并且在我终止节点进程并再次运行它们之前无法运行任何更多作业。
我应该如何等待数据到来?此外,错误处理pool
非常差,因为它保存了来自另一个进程的错误并且没有正确的回溯显示,而且这个错误并不常见重复......
编辑:现在我认为这个问题与套接字无关:
经过一些研究,看起来我的节点正在为许多进程打开方式(因为它们也在 a 中运行它们的工作multiprocessing.Pool
)并且不知何故它们没有被关闭!
我发现这些 SO 问题(此处和此处)在谈论在守护进程中使用multiprocessing
时的僵尸进程(正是我的情况!)。
我需要进一步了解这个问题,但现在我正在杀死节点并在一段时间后恢复它们。