7

我目前正在使用 python 中的标准多处理来生成一堆将无限期运行的进程。我并不特别关心性能。每个线程只是在观察文件系统上的不同变化,并在文件被修改时采取适当的行动。

目前,我有一个在 Linux 中可以满足我需求的解决方案。我有一个函数和参数字典,如下所示:

 job_dict['func1'] = {'target': func1, 'args': (args,)}

对于每一个,我创建一个流程:

 import multiprocessing
 for k in job_dict.keys():
     jobs[k] = multiprocessing.Process(target=job_dict[k]['target'],
                                       args=job_dict[k]['args'])

有了这个,我可以跟踪每个正在运行的任务,并且在必要时重新启动因任何原因崩溃的作业。

这在 Windows 中不起作用。我使用的许多函数都是包装器,使用各种functools函数,并且我收到有关无法序列化函数的消息(请参阅多处理和 dill 可以一起做什么?)。我还没有弄清楚为什么我在 Linux 中没有出现此错误,但在 Windows 中却出现了。

如果我在 Windows 中启动进程之前导入dill,我不会收到序列化错误。然而,这些进程实际上并没有做任何事情。我不知道为什么。

然后我切换到 中的多处理实现pathos,但没有在标准模块中找到简单Process类的模拟。multiprocessing我能够使用pathos.pools.ThreadPool. 我敢肯定,这不是 map 的预期用途,但它启动了所有线程,并且它们在 Windows 中运行:

import pathos
tp = pathos.pools.ThreadPool()
for k in job_dict.keys():
    tp.uimap(job_dict[k]['target'], job_dict[k]['args'])

但是,现在我不确定如何监视线程​​是否仍然处于活动状态,我正在寻找它以便我可以重新启动由于某种原因而崩溃的线程。有什么建议么?

4

1 回答 1

7

我是pathosdill作者。该类Process深埋在pathosat 中pathos.helpers.mp.process.Process,它mp本身就是multiprocessing库的实际分支。里面的所有东西都multiprocessing应该可以从那里访问。

要知道的另一件事pathos是,它会pool为您保持活动状态,直到您将其从保持状态中删除。这有助于减少创建“新”池的开销。要删除池,请执行以下操作:

>>> # create
>>> p = pathos.pools.ProcessPool()
>>> # remove
>>> p.clear()

然而,没有这样的机制Process

因为multiprocessing,windows 与 Linux 和 Macintosh 不同……因为 windows 在 linux 上没有适当fork的相似之处……linux 可以跨进程共享对象,而在 windows 上没有共享……它基本上是一个完全独立的新进程创建……因此序列化对象必须更好地传递给另一个进程——就像你将对象发送到另一台计算机一样。在 linux 上,您必须这样做才能获得相同的行为:

def check(obj, *args, **kwds):
    """check pickling of an object across another process"""
    import subprocess
    fail = True
    try:
        _x = dill.dumps(x, *args, **kwds)
        fail = False
    finally:
        if fail:
            print "DUMP FAILED"
    msg = "python -c import dill; print dill.loads(%s)" % repr(_x)
    print "SUCCESS" if not subprocess.call(msg.split(None,2)) else "LOAD FAILED"
于 2015-07-30T20:29:45.070 回答