27

我正在尝试创建一个类,它可以运行一个单独的进程来完成一些需要很长时间的工作,从一个主模块启动一堆这些,然后等待它们全部完成。我想启动一次流程,然后继续为它们提供要做的事情,而不是创建和破坏流程。例如,也许我有 10 台服务器运行 dd 命令,然后我希望它们都 scp 文件等。

我的最终目标是为每个系统创建一个类,以跟踪与它相关的系统的信息,例如 IP 地址、日志、运行时等。但是该类必须能够启动系统命令然后返回在系统命令运行时执行返回给调用者,以便稍后跟进系统命令的结果。

我的尝试失败了,因为我无法通过管道将类的实例方法通过 pickle 发送到子进程。那些是不可腌制的。因此,我尝试以各种方式修复它,但我无法弄清楚。如何修补我的代码来做到这一点?如果你不能发送任何有用的东西,多处理有什么好处?

是否有与类实例一起使用多处理的良好文档?我可以让多处理模块工作的唯一方法是使用简单的功能。每次在类实例中使用它的尝试都失败了。也许我应该改为传递事件?我还不明白该怎么做。

import multiprocessing
import sys
import re

class ProcessWorker(multiprocessing.Process):
    """
    This class runs as a separate process to execute worker's commands in parallel
    Once launched, it remains running, monitoring the task queue, until "None" is sent
    """

    def __init__(self, task_q, result_q):
        multiprocessing.Process.__init__(self)
        self.task_q = task_q
        self.result_q = result_q
        return

    def run(self):
        """
        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
        """
        proc_name = self.name
        print '%s: Launched' % (proc_name)
        while True:
            next_task_list = self.task_q.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % (proc_name)
                self.task_q.task_done()
                break
            next_task = next_task_list[0]
            print '%s: %s' % (proc_name, next_task)
            args = next_task_list[1]
            kwargs = next_task_list[2]
            answer = next_task(*args, **kwargs)
            self.task_q.task_done()
            self.result_q.put(answer)
        return
# End of ProcessWorker class

class Worker(object):
    """
    Launches a child process to run commands from derived classes in separate processes,
    which sit and listen for something to do
    This base class is called by each derived worker
    """
    def __init__(self, config, index=None):
        self.config = config
        self.index = index

        # Launce the ProcessWorker for anything that has an index value
        if self.index is not None:
            self.task_q = multiprocessing.JoinableQueue()
            self.result_q = multiprocessing.Queue()

            self.process_worker = ProcessWorker(self.task_q, self.result_q)
            self.process_worker.start()
            print "Got here"
            # Process should be running and listening for functions to execute
        return

    def enqueue_process(target):  # No self, since it is a decorator
        """
        Used to place an command target from this class object into the task_q
        NOTE: Any function decorated with this must use fetch_results() to get the
        target task's result value
        """
        def wrapper(self, *args, **kwargs):
            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
        return wrapper

    def fetch_results(self):
        """
        After all processes have been spawned by multiple modules, this command
        is called on each one to retreive the results of the call.
        This blocks until the execution of the item in the queue is complete
        """
        self.task_q.join()                          # Wait for it to to finish
        return self.result_q.get()                  # Return the result

    @enqueue_process
    def run_long_command(self, command):
        print "I am running number % as process "%number, self.name

        # In here, I will launch a subprocess to run a  long-running system command
        # p = Popen(command), etc
        # p.wait(), etc
        return 

    def close(self):
        self.task_q.put(None)
        self.task_q.join()

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(5):
        worker = Worker(config, index)
        worker.run_long_command("ls /")
        workers.append(worker)
    for worker in workers:
        worker.fetch_results()

    # Do more work... (this would actually be done in a distributor in another class)

    for worker in workers:
        worker.close() 

编辑:我试图将ProcessWorker类和多处理队列的创建移到类之外,Worker然后尝试手动腌制工作实例。即使这样也不起作用,我得到一个错误

RuntimeError:队列对象只能通过继承在进程之间共享

. 但我只是将这些队列的引用传递给工作实例?我缺少一些基本的东西。这是主要部分的修改代码:

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(1):
        task_q = multiprocessing.JoinableQueue()
        result_q = multiprocessing.Queue()
        process_worker = ProcessWorker(task_q, result_q)
        worker = Worker(config, index, process_worker, task_q, result_q)
        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
        process_worker.start()
        worker.run_long_command("ls /")
4

3 回答 3

27

所以,问题是我假设 Python 正在做某种与 C++/fork() 工作方式不同的魔法。我不知何故认为 Python 只是复制了类,而不是将整个程序复制到一个单独的进程中。我严重浪费了几天试图让它工作,因为所有关于泡菜序列化的讨论让我认为它实际上是通过管道发送所有内容。我知道某些东西不能通过管道发送,但我认为我的问题是我没有正确包装东西。

如果 Python 文档让我从 10,000 英尺的角度了解使用此模块时会发生什么,这一切都可以避免。当然,它告诉我多进程模块的方法是做什么的,并给了我一些基本的例子,但我想知道的是幕后的“操作理论”是什么!这是我可以使用的信息。如果我的回答是关闭的,请插话。它将帮助我学习。

当您使用此模块运行启动进程时,整个程序将被复制到另一个进程中。但由于它不是“__main__“进程并且我的代码正在检查它,它不会无限地启动另一个进程。它只是停止并坐在那里等待某事做,就像僵尸一样。在调用时在父级中初始化的所有内容multiprocess.Process() 已全部设置好并准备就绪。一旦您将某些内容放入 multiprocess.Queue 或共享内存或管道等(无论您正在通信),那么单独的进程就会接收它并开始工作。它可以像父进程一样使用所有导入的模块和设置。但是,一旦父进程或单独进程中的某些内部状态变量发生更改,这些更改就会被隔离。一旦生成进程,现在就成为您的工作如有必要,它们可以通过队列、管道、共享内存等同步。

我扔掉了代码并重新开始,但现在我只在 中添加了一个额外的函数ProcessWorker,一个运行命令行的“执行”方法。很简单。我不必担心以这种方式启动然后关闭一堆进程,这给我带来了过去在 C++ 中的各种不稳定和性能问题。当我一开始切换到启动进程,然后将消息传递给那些等待的进程时,我的性能提高了,而且非常稳定。

顺便说一句,我查看了此链接以获取帮助,这让我很失望,因为该示例让我认为方法正在跨队列传输:http: //www.doughellmann.com/PyMOTW/multiprocessing/communication.html 第二个示例第一部分使用“next_task()”,它(对我来说)似乎正在执行通过队列接收的任务。

于 2013-01-06T06:03:54.613 回答
8

与其尝试发送方法本身(这是不切实际的),不如尝试发送要执行的方法的名称

假设每个工作人员运行相同的代码,这只是一个简单的getattr(self, task_name).

我会通过 tuples (task_name, task_args)task_args将 dict 直接提供给 task 方法的位置:

next_task_name, next_task_args = self.task_q.get()
if next_task_name:
  task = getattr(self, next_task_name)
  answer = task(**next_task_args)
  ...
else:
  # poison pill, shut down
  break
于 2013-01-05T07:44:33.817 回答
0

参考:https ://stackoverflow.com/a/14179779

大卫林奇在 1 月 6 日 6:03 的回答实际上并不正确,因为他说他被 http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html误导了。

提供的代码和示例是正确的,并且可以像宣传的那样工作。next_task() 正在执行通过队列接收到的任务——尝试了解该Task.__call__()方法在做什么。

就我而言,让我绊倒的是我的run(). 似乎子进程不会报告这一点,只是默默地失败——让事情陷入奇怪的循环!确保您运行某种语法检查器,例如 Emacs 中的 Flymake/Pyflakes。

通过 F 进行调试multiprocessing.log_to_stderr()帮助我缩小了问题的范围。

于 2013-11-28T04:06:44.393 回答