6

我正在尝试使用多个进程创建一个程序,如果发生错误,我想干净地终止所有生成的进程。下面我为我认为我需要做的事情写了一些伪类型代码,但我不知道最好的方法是与所有发生错误的进程进行通信并且它们应该终止。

我想我应该为这类事情使用类,但我对 Python 还是很陌生,所以我只是想先了解一下基础知识。

#imports

exitFlag = True

# Function for threads to process   
def url_thread_worker( ):   
# while exitFlag:
    try:
        # do something
    except:
        # we've ran into a problem, we need to kill all the spawned processes and cleanly exit the program
        exitFlag = False

def processStarter( ):

    process_1 = multiprocessing.Process( name="Process-1", target=url_thread_worker, args=( ) ) 
    process_2 = multiprocessing.Process( name="Process-2", target=url_thread_worker, args=( ) ) 

    process_1.start()
    process_2.start()


if __name__ == '__main__':
     processStarter( )

提前致谢

4

2 回答 2

4

这是我的建议:

import multiprocessing
import threading
import time

def good_worker():   
    print "[GoodWorker] Starting"
    time.sleep(4)
    print "[GoodWorker] all good"

def bad_worker():
    print "[BadWorker] Starting"
    time.sleep(2)
    raise Exception("ups!")

class MyProcManager(object):
    def __init__(self):
        self.procs = []
        self.errors_flag = False
        self._threads = []
        self._lock = threading.Lock()

    def terminate_all(self):
        with self._lock:
            for p in self.procs:
                if p.is_alive():
                    print "Terminating %s" % p
                    p.terminate()

    def launch_proc(self, func, args=(), kwargs= {}):
        t = threading.Thread(target=self._proc_thread_runner,
                             args=(func, args, kwargs))
        self._threads.append(t)
        t.start()

    def _proc_thread_runner(self, func, args, kwargs):
        p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
        self.procs.append(p)
        p.start()
        while p.exitcode is None:
            p.join()
        if p.exitcode > 0:
            self.errors_flag = True
            self.terminate_all()

    def wait(self):
        for t in self._threads:
            t.join()

if __name__ == '__main__':
    proc_manager = MyProcManager()
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(bad_worker) 
    proc_manager.wait()
    if proc_manager.errors_flag:
        print "Errors flag is set: some process crashed"
    else:
        print "Everything closed cleanly"

您需要为每个进程运行一个包装线程,等待其结束。当一个进程结束时,检查退出代码:如果 > 0,意味着它引发了一些未处理的异常。现在调用 terminate_all() 来关闭所有剩余的活动进程。包装线程也将完成,因为它们依赖于进程运行。

此外,在您的代码中,您可以随时随意调用 proc_manager.terminate_all() 。您可以在不同的线程中检查一些标志或类似的东西..

希望它对你的情况有好处。

PS:顺便说一句..在您的原始代码中,您执行了类似全局 exit_flag 的操作:您永远不能在多处理中拥有“全局”exit_flag,因为它根本不是全局的,因为您使用具有分隔内存空间的分隔进程。这仅适用于可以共享状态的线程环境。如果您在多处理中需要它,那么您必须在进程之间进行显式通信(管道和队列完成)或共享内存对象之类的东西

于 2013-11-14T16:51:22.407 回答
1

如果您希望您的子进程在父进程退出时自动终止;您可以使它们成为守护进程(.daemon=True在 之前设置.start()),即,如果父级检测到错误;它可能会退出——孩子们会得到照顾。

如果你想让孩子自己打扫卫生;您可以用作multiprocessing.Event()全局标志

import multiprocessing

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

if __name__ == '__main__':
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()
于 2013-11-14T18:07:26.897 回答