4

我正在运行一个备份脚本,该脚本启动子进程以通过 rsync 执行备份。但是我无法限制它一次启动的 rsync 数量。

这是我目前正在处理的代码:

print "active_children: ", multiprocessing.active_children()
print "active_children len: ", len(multiprocessing.active_children())
while len(multiprocessing.active_children()) > 49:
   sleep(2)
p = multiprocessing.Process(target=do_backup, args=(shash["NAME"],ip,shash["buTYPE"], ))
jobs.append(p)
p.start()

当我运行数百个 rsync 时,这最多显示一个孩子。这是实际启动 rsync 的代码(从 do_backup 函数内部),它command是一个包含 rsync 行的变量:

print command
subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
return 1

如果我在 do_backup 函数中添加一个 sleep(x) ,它将在它睡觉时显示为一个活跃的孩子。此外,进程表显示 rsync 进程的 PPID 为 1。我由此假设 rsync 分裂并且不再是 python 的子进程,这允许我的子进程死亡,所以我不能再指望它了. 有谁知道如何让 python 孩子活着并被计数,直到 rsync 完成?

4

3 回答 3

6

多处理池

您是否考虑过使用multiprocessing.Pool的?这些允许您定义固定数量的工作进程,用于执行您想要的工作。这里的关键是固定数字,它可以让您完全控制要启动的 rsync 实例的数量。

查看我链接的文档中提供的示例,首先您声明一个Pool进程n,然后您可以决定是否map()apply()(与他们各自的_async()兄弟姐妹)您的工作到池中。

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    pool.apply_async(f, (10,))    # evaluate "f(10)" asynchronously
    ...
    pool.map(f, range(10))

这里的明显优势是你永远不会意外地对你的机器进行分叉炸弹,因为你只会产生请求的n进程。

运行你的 rsync

然后,您的进程生成代码将变为:

from multiprocessing import Pool

def do_backup(arg1, arg2, arg3, ...):
    # Do stuff

if __name__ == '__main__':
    # Start a Pool with 4 processes
    pool = Pool(processes=4)
    jobs = []

    for ... :
        # Run the function
        proc = pool.apply_async(func=do_backup, args=(shash["NAME"],ip,shash["buTYPE"], ))
        jobs.append(proc)

    # Wait for jobs to complete before exiting
    while(not all([p.ready() for p in jobs])):
        time.sleep(5)

    # Safely terminate the pool
    pool.close()
    pool.join()
于 2014-10-28T13:53:18.903 回答
2

让我们先澄清一些误解

我由此假设 rsync 分裂并且不再是 python 的子进程,这使我的子进程死亡,所以我不能再指望它了。

rsync确实“分裂”。在 UNIX 系统上,这称为fork

当一个进程分叉时,会创建一个子进程——python 的子进程rsync 也是如此。这个孩子独立于父母 - 并且同时(“同时”)执行。

一个进程可以管理它自己的子进程。有特定的系统调用,但在谈论 python 时有点跑题了,它有自己的高级接口

如果您查看subprocess.Popen的文档,您会注意到它根本不是函数调用:它是一个类。通过调用它,您将创建该类的一个实例——一个Popen 对象。这样的对象有多种方法。特别是,wait将允许您阻止父进程(python),直到子进程终止。


考虑到这一点,让我们看一下您的代码并稍微简化一下:

p = multiprocessing.Process(target=do_backup, ...)

在这里,您实际上是在分叉并创建一个子进程。该进程是另一个 python 解释器(与所有multiprocessing进程一样),并将执行该do_backup函数。

def do_backup()
    subprocess.Popen("rsync ...", ...)

在这里,你又分叉。您将创建另一个进程 ( rsync),并让它“在后台”运行,因为您不喜欢wait它。


清除所有这些后,我希望您可以看到现有代码的前进方向。如果您想降低其复杂性,我建议您检查并调整 JoErNanO 的答案,该答案可重复使用multiprocessing.Pool以自动跟踪流程。

无论您决定采用哪种方式,都应避免Popen创建rsync流程的分叉 - 因为这会不必要地创建另一个流程。取而代之的是 check os.execv,它将当前进程替换为另一个

于 2014-10-28T14:20:29.457 回答
0

这不是多线程,而是多处理。我假设您使用的是 Unix 系统,如果您正在使用,rsync尽管我相信它可以在 Windows 系统上运行。为了控制生成的子进程的死亡,您必须使用fork它们。

这里有一个关于在 Python 中执行此操作的好问题。

于 2014-10-28T13:56:27.347 回答