-1

如何在 python3 中使用 pathos 而不是 multiprocessing 模块实现非守护进程?

更具体地说,我指的是: Python Process Pool non-daemonic?

这篇文章的答案是通过多处理模块实现非守护进程。不幸的是,这个模块不允许在其他对象中腌制 lambda 函数,但 pathos 在 Python 2 中可以:

#import multiprocessing
#import multiprocessing.pool
import pathos

#class NoDaemonProcess(multiprocessing.Process):
class NoDaemonProcess(pathos.multiprocessing.Pool.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

#class NoDaemonPool(multiprocessing.pool.Pool):
class NoDaemonPool(pathos.multiprocessing.Pool):
    Process = NoDaemonProcess

def myproc(args):
    i, max_workers = args
    #pool = multiprocessing.Pool(max_workers)
    pool = pathos.pools.ProcessPool(max_workers)
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    print("myproc", l_args, pool.map(mysubproc, l_args))
    return i

max_workers = [2, 1]
executor = NoDaemonPool(max_workers[0])
#executor = pathos.multiprocessing.Pool(max_workers[0])
l_args = [(i, max_workers[1]) for i in range(10)]
print(executor.map(myproc, l_args))

输出:

('myproc', [], [])
('myproc', [0, 1], [0, 1])
('myproc', [0], [0])
('myproc', [0, 1, 2], [0, 1, 2])
('myproc', [0, 1, 2, 3], [0, 1, 2, 3])
('myproc', [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5])
('myproc', [0, 1, 2, 3, 4], [0, 1, 2, 3, 4])
('myproc', [0, 1, 2, 3, 4, 5, 6], [0, 1, 2, 3, 4, 5, 6])
('myproc', [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7])
('myproc', [0, 1, 2, 3, 4, 5, 6, 7, 8], [0, 1, 2, 3, 4, 5, 6, 7, 8])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

在 Python 3 中,pathos 模块相对于 Python 2 发生了变化,例如 pathos.multiprocessing.Pool.Process 不再是一个类,而是一个函数,因此不能再将它用于继承(见上文)。- 我缺少任何悲伤的文档吗?

如何使上述代码在 Python 3 中的 pathos 中工作?

作为上述特定示例的一种解决方法,可以简单地回退到多处理 NoDaemonPool 实现,并为守护程序子进程使用 pathos:

import multiprocessing
import multiprocessing.pool
import pathos

class NoDaemonProcess(multiprocessing.Process):
#class NoDaemonProcess(pathos.multiprocessing.Pool.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(multiprocessing.pool.Pool):
#class NoDaemonPool(pathos.multiprocessing.Pool):
    Process = NoDaemonProcess

def myproc(args):
    i, max_workers = args
    #pool = multiprocessing.Pool(max_workers)
    pool = pathos.pools.ProcessPool(max_workers)
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    print("myproc", l_args, pool.map(mysubproc, l_args))
    return i

max_workers = [2, 1]
executor = NoDaemonPool(max_workers[0])
#executor = pathos.multiprocessing.Pool(max_workers[0])
l_args = [(i, max_workers[1]) for i in range(10)]
print(executor.map(myproc, l_args))

然而,这种变通方法不是一个解决方案,因为 (i) 它同时导入了 pathos 和 multiprocessing,更重要的是 (ii) 它不能腌制,例如,如果 myproc 被定义为

myproc = lambda x : x

非常感谢,最好的,塞巴斯蒂安

4

1 回答 1

0

我刚刚自己找到了 Python 3 的答案,它采用了最初的想法

Python 进程池非守护进程?

并按照中的建议保持干净的过程错误处理

如何运行嵌套的、分层的 pathos 多处理地图?

import pathos
import signal
import sys
import os
import time

# redefine process pool via inheritance
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)

def get_pid_i(x):
    return os.getpid()
def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

def myproc(args):
    i, max_workers = args
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    pool = pathos.pools.ProcessPool(max_workers)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, range(max_workers))
    try:
        l_traj_df = pool.amap(mysubproc, l_args)
        counter_i = 0
        while not l_traj_df.ready():
            time.sleep(1)
            if counter_i % 30 == 0:
                print('Waiting for children running in pool.amap() in myproc( {} ) with PIDs: {}'.format(i, pid_is))
            counter_i += 1
        l_traj_df = l_traj_df.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received in myproc( {} ), attempting to terminate pool...').format(myens)
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {} in myproc( {} )'.format(sys.exc_info()[0], myens))
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise

#myproc = lambda x : x

max_workers = [2, 1]
pool = NoDaemonPool(max_workers[0])
#pool.restart(force=True)
pid_is = pool.map(get_pid_i, range(max_workers[0]))
try:
    results = pool.map_async(myproc, l_args)
    counter_i = 0
    while not results.ready():
        if counter_i % 30 == 0:
            print('Waiting for children running in pool.map_async() in main() with PIDs: {}'.format(pid_is))
        time.sleep(2)
        counter_i += 1
    results = results.get()
    pool.close()
    pool.join()
except KeyboardInterrupt:
    print('Ctrl+C received in main(), attempting to terminate pool...')
    hard_kill_pool(pid_is, pool)  # sending Ctrl+C
    raise
except:
    print('Attempting to close parallel after exception: {} in main()'.format(_sys.exc_info()[0]))
    hard_kill_pool(pid_is, pool)  # sending Ctrl+C
    raise

输出:

Waiting for children running in pool.map_async() in main() with PIDs [15015, 15014]
Waiting for children running in pool.amap() in myproc( 2 ) with PIDs [15019]
Waiting for children running in pool.amap() in myproc( 1 ) with PIDs: [15020]
Waiting for children running in pool.amap() in myproc( 3 ) with PIDs [15021]
Waiting for children running in pool.amap() in myproc( 4 ) with PIDs [15022]
Waiting for children running in pool.amap() in myproc( 6 ) with PIDs [15024]
Waiting for children running in pool.amap() in myproc( 5 ) with PIDs [15023]
Waiting for children running in pool.amap() in myproc( 7 ) with PIDs [15025]
Waiting for children running in pool.amap() in myproc( 8 ) with PIDs [15026]
Waiting for children running in pool.amap() in myproc( 9 ) with PIDs [15028]

我使用了以下模块版本:

python                    3.6.0                         0
pathos                    0.2.1                    py36_1    condo-forge
multiprocess              0.70.4                   py36_0    http://conda.binstar.org/omnia
dill                      0.2.7.1          py36h644ae93_0  
pox                       0.2.3                    py36_0    conda-forge
ppft                      1.6.4.7.1                py36_0    conda-forge
six                       1.10.0                   py36_0  
于 2017-12-01T22:10:17.800 回答