68

我目前正在玩多处理和队列。我编写了一段代码来从 mongoDB 导出数据,将其映射为关系(平面)结构,将所有值转换为字符串并将它们插入 mysql。

这些步骤中的每一个都作为一个进程提交并给定导入/导出队列,这对于在父级中处理的 mongoDB 导出是安全的。

正如您将在下面看到的,我使用队列,当子进程从队列中读取“无”时,它们会自行终止。我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别,其余进程继续运行。我想要发生的是整个shebang退出,充其量是重新提出孩子的错误。

我有两个问题:

  1. 如何检测父级中的子级错误?
  2. 检测到错误后如何杀死我的子进程(最佳实践)?我意识到将“无”放入队列以杀死孩子是很肮脏的。

我正在使用python 2.7。

以下是我的代码的基本部分:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()

[... 初始化 mongo 数据库连接 ...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()
4

4 回答 4

55

为什么不让 Process 处理它自己的异常,像这样:

from __future__ import print_function
import multiprocessing as mp
import traceback

class Process(mp.Process):
    def __init__(self, *args, **kwargs):
        mp.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = mp.Pipe()
        self._exception = None

    def run(self):
        try:
            mp.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception

现在您已经掌握了错误和回溯:

def target():
    raise ValueError('Something went wrong...')

p = Process(target = target)
p.start()
p.join()

if p.exception:
    error, traceback = p.exception
    print(traceback)

问候, 马雷克

于 2015-11-08T22:50:33.943 回答
49

我不知道标准实践,但我发现为了拥有可靠的多处理,我设计了方法/类/等。专门用于多处理。否则你永远不会真正知道另一边发生了什么(除非我错过了一些机制)。

具体我做的是:

  • 子类multiprocessing.Process化或制作专门支持多处理的函数(必要时包装您无法控制的函数)
  • 始终提供multiprocessing.Queue从主进程到每个工作进程的共享错误
  • 将整个运行代码包含在try: ... except Exception as e. 然后,当发生意外情况时,发送一个错误包:
    • 死掉的进程ID
    • 原始上下文的异常(检查here)。如果您想在主进程中记录有用的信息,原始上下文非常重要。
  • 当然在工人的正常操作中正常处理预期的问题
  • (类似于你已经说过的)假设一个长时间运行的进程,用循环包装正在运行的代码(在 try/catch-all 内)
    • 在类或函数中定义一个停止标记。
    • 当主进程希望工人停止时,只需发送停止令牌。阻止所有人,发送足够的所有进程。
    • 包装循环检查输入 q 的令牌或您想要的任何其他输入

最终结果是工作进程可以存活很长时间,并且可以让您知道出现问题时会发生什么。它们会安静地死去,因为您可以在 catch-all 异常之后处理您需要做的任何事情,并且您还将知道何时需要重新启动工作程序。

同样,我只是通过反复试验才得出这种模式,所以我不知道它有多标准。这对您的要求有帮助吗?

于 2013-11-12T12:54:11.123 回答
8

@mrkwjc 的解决方案很简单,很容易理解和实施,但是这个解决方案有一个缺点。当我们的进程很少,如果任何一个进程出现错误,我们想停止所有进程,我们需要等到所有进程都完成才能检查是否p.exception。下面是解决这个问题的代码(即当一个孩子出错时,我们也终止另一个孩子):

import multiprocessing
import traceback

from time import sleep


class Process(multiprocessing.Process):
    """
    Class which returns child Exceptions to Parent.
    https://stackoverflow.com/a/33599967/4992248
    """

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._parent_conn, self._child_conn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._child_conn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._child_conn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._parent_conn.poll():
            self._exception = self._parent_conn.recv()
        return self._exception


class Task_1:
    def do_something(self, queue):
        queue.put(dict(users=2))


class Task_2:
    def do_something(self, queue):
        queue.put(dict(users=5))


def main():
    try:
        task_1 = Task_1()
        task_2 = Task_2()

        # Example of multiprocessing which is used:
        # https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
        task_1_queue = multiprocessing.Queue()
        task_2_queue = multiprocessing.Queue()

        task_1_process = Process(
            target=task_1.do_something,
            kwargs=dict(queue=task_1_queue))

        task_2_process = Process(
            target=task_2.do_something,
            kwargs=dict(queue=task_2_queue))

        task_1_process.start()
        task_2_process.start()

        while task_1_process.is_alive() or task_2_process.is_alive():
            sleep(10)

            if task_1_process.exception:
                error, task_1_traceback = task_1_process.exception

                # Do not wait until task_2 is finished
                task_2_process.terminate()

                raise ChildProcessError(task_1_traceback)

            if task_2_process.exception:
                error, task_2_traceback = task_2_process.exception

                # Do not wait until task_1 is finished
                task_1_process.terminate()

                raise ChildProcessError(task_2_traceback)

        task_1_process.join()
        task_2_process.join()

        task_1_results = task_1_queue.get()
        task_2_results = task_2_queue.get()

        task_1_users = task_1_results['users']
        task_2_users = task_2_results['users']

    except Exception:
        # Here usually I send email notification with error.
        print('traceback:', traceback.format_exc())


if __name__ == "__main__":
    main()
于 2019-09-23T10:49:02.253 回答
7

感谢 kobejohn,我找到了一个很好且稳定的解决方案。

  1. 我创建了一个 multiprocessing.Process 的子类,它实现了一些功能并覆盖了run()将新的 saferun 方法包装到 try-catch 块中的方法。这个类需要一个 feedback_queue 来初始化,它用于向父级报告信息、调试、错误消息。类中的日志方法是包的全局定义日志函数的包装器:

    class EtlStepProcess(multiprocessing.Process):
    
        def __init__(self, feedback_queue):
            multiprocessing.Process.__init__(self)
            self.feedback_queue = feedback_queue
    
        def log_info(self, message):
            log_info(self.feedback_queue, message, self.name)
    
        def log_debug(self, message):
            log_debug(self.feedback_queue, message, self.name)
    
        def log_error(self, err):
            log_error(self.feedback_queue, err, self.name)
    
        def saferun(self):
            """Method to be run in sub-process; can be overridden in sub-class"""
            if self._target:
                self._target(*self._args, **self._kwargs)
    
        def run(self):
            try:
                self.saferun()
            except Exception as e:
                self.log_error(e)
                raise e
            return
    
  2. 我已经从 EtlStepProcess 中继承了所有其他流程步骤。要运行的代码在 saferun() 方法中实现,而不是运行。这样我就不必在它周围添加一个 try catch 块,因为这已经由 run() 方法完成。例子:

    class MySqlWriter(EtlStepProcess):
    
        def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count,
                     input_queue, feedback_queue):
            EtlStepProcess.__init__(self, feedback_queue)
            self.mysql_host = mysql_host
            self.mysql_user = mysql_user
            self.mysql_passwd = mysql_passwd
            self.mysql_schema = mysql_schema
            self.mysql_table = mysql_table
            self.columns = columns
            self.commit_count = commit_count
            self.input_queue = input_queue
    
        def saferun(self):
            self.log_info(self.name + " started")
            #create mysql connection
            engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema)
            meta = sqlalchemy.MetaData()
            table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine)
            connection = engine.connect()
            try:
                self.log_info("start MySQL insert")
                counter = 0
                row_list = []
                while True:
                    next_row = self.input_queue.get()
                    if isinstance(next_row, Terminator):
                        if counter % self.commit_count != 0:
                            connection.execute(table.insert(), row_list)
                        # Poison pill means we should exit
                        break
                    row_list.append(next_row)
                    counter += 1
                    if counter % self.commit_count == 0:
                        connection.execute(table.insert(), row_list)
                        del row_list[:]
                        self.log_debug(self.name + ' ' + str(counter))
    
            finally:
                connection.close()
            return
    
  3. 在我的主文件中,我提交了一个完成所有工作的进程并给它一个feedback_queue。此过程启动所有步骤,然后从 mongoDB 读取并将值放入初始队列。我的主进程监听反馈队列并打印所有日志消息。如果它收到一个错误日志,它会打印错误并终止它的孩子,作为回报,它也会在死亡之前终止它的所有孩子。

    if __name__ == '__main__':
    feedback_q = multiprocessing.Queue()
    p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,))
    p.start()
    
    while p.is_alive():
        fb = feedback_q.get()
        if fb["type"] == "error":
            p.terminate()
            print "ERROR in " + fb["process"] + "\n"
            for child in multiprocessing.active_children():
                child.terminate()
        else:
            print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \
                                                  fb["process"] + ": " + fb["message"]
    
    p.join()
    

我考虑用它制作一个模块并将其放在 github 上,但我必须先做一些清理和评论。

于 2013-11-13T10:21:29.893 回答