130

我在尝试理解多处理队列在 python 上的工作方式以及如何实现它时遇到了很多麻烦。假设我有两个从共享文件访问数据的 python 模块,我们将这两个模块称为写入器和读取器。我的计划是让读取器和写入器都将请求放入两个单独的多处理队列中,然后让第三个进程在循环中弹出这些请求并按此执行。

我的主要问题是我真的不知道如何正确实现 multiprocessing.queue,你不能真正为每个进程实例化对象,因为它们将是单独的队列,你如何确保所有进程都与共享队列相关(或在这种情况下,排队)

4

7 回答 7

158

我的主要问题是我真的不知道如何正确实现 multiprocessing.queue,你不能真正为每个进程实例化对象,因为它们将是单独的队列,你如何确保所有进程都与共享队列相关(或在这种情况下,排队)

这是一个读取器和写入器共享单个队列的简单示例……写入器向读取器发送一堆整数;当写入器用完数字时,它会发送“DONE”,让读取器知道要跳出读取循环。

您可以根据需要生成任意数量的阅读器进程...

from multiprocessing import Process, Queue
import time
import sys


def reader_proc(queue):
    """Read from the queue; this spawns as a separate Process"""
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break


def writer(count, num_of_reader_procs, queue):
    """Write integers into the queue.  A reader_proc() will read them from the queue"""
    for ii in range(0, count):
        queue.put(ii)  # Put 'count' numbers into queue

    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        queue.put("DONE")


def start_reader_procs(qq, num_of_reader_procs):
    """Start the reader processes and return all in a list to the caller"""
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = Process(target=reader_proc, args=((qq),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc

        all_reader_procs.append(reader_p)

    return all_reader_procs


if __name__ == "__main__":
    num_of_reader_procs = 2
    qq = Queue()  # writer() writes to qq from _this_ process
    for count in [10**4, 10**5, 10**6]:
        assert num_of_reader_procs > 0
        assert num_of_reader_procs < 4
        all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

        writer(count, len(all_reader_procs), qq)  # Queue stuff to all reader_p()
        print("All reader processes are pulling numbers from the queue...")

        _start = time.time()
        for idx, a_reader_proc in enumerate(all_reader_procs):
            print("    Waiting for reader_p.join() index %s" % idx)
            a_reader_proc.join()  # Wait for a_reader_proc() to finish

            print("        reader_p() idx:%s is done" % idx)

        print(
            "Sending {0} integers through Queue() took {1} seconds".format(
                count, (time.time() - _start)
            )
        )
        print("")
于 2012-07-17T05:18:04.190 回答
16

这是一个非常简单的用法,multiprocessing.Queuemultiprocessing.Process允许调用者将“事件”加上参数发送到单独的进程,该进程将事件分派到进程上的“do_”方法。(Python 3.4+)

import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

用法:

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

send发生在父进程中,发生do_*在子进程中。

我省略了任何明显会中断运行循环并退出子进程的异常处理。您还可以通过覆盖run来控制阻塞或其他任何东西来自定义它。

这实际上仅在您有一个工作进程的情况下有用,但我认为这是对这个问题的相关答案,以展示一个具有更多面向对象的常见场景。

于 2019-07-22T06:07:57.330 回答
13

我查看了堆栈溢出和网络上的多个答案,同时尝试设置一种使用队列传递大型熊猫数据帧的多处理方式。在我看来,每个答案都在重复相同类型的解决方案,而没有考虑在设置此类计算时肯定会遇到的大量边缘情况。问题是同时有很多事情在起作用。任务数量、worker 数量、每个任务的持续时间以及任务执行过程中可能出现的异常。所有这些都使同步变得棘手,并且大多数答案都没有解决您如何进行同步。所以这是我摆弄了几个小时后的看法,希望这对大多数人来说足够通用,觉得它很有用。

在任何编码示例之前的一些想法。由于queue.Emptyorqueue.qsize()或任何其他类似的方法对于流量控制是不可靠的,因此任何类似的代码

while True:
    try:
        task = pending_queue.get_nowait()
    except queue.Empty:
        break

是假的。即使几毫秒后队列中出现另一个任务,这也会杀死工作人员。工人不会恢复,一段时间后所有工人都会消失,因为他们随机发现队列暂时空了。最终结果将是主要的多处理函数(进程上带有 join() 的那个)将在所有任务未完成的情况下返回。好的。如果您有数千个任务并且缺少一些任务,那么祝您调试好运。

另一个问题是哨兵值的使用。许多人建议在队列中添加一个哨兵值来标记队列的末尾。但是到底要向谁标记呢?如果有 N 个工作人员,假设 N 是可用的核数,则单个哨兵值只会将队列的末尾标记给一个工作人员。当没有剩余工作时,所有其他工人将坐等更多工作。我见过的典型例子是

while True:
    task = pending_queue.get()
    if task == SOME_SENTINEL_VALUE:
        break

一名工作人员将获得哨兵值,而其余工作人员将无限期等待。我遇到的任何帖子都没有提到您需要将哨兵值提交到队列中的次数至少与您拥有工人的次数一样多,以便所有人都能得到它。

另一个问题是任务执行期间的异常处理。同样,这些应该被捕获和管理。此外,如果您有一个completed_tasks队列,您应该在决定工作完成之前以确定的方式独立计算队列中有多少项目。再次依赖队列大小必然会失败并返回意外结果。

在下面的示例中,该par_proc()函数将接收一个任务列表,其中包括应与任何命名参数和值一起执行这些任务的函数。

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil

SENTINEL = None


def do_work(tasks_pending, tasks_completed):
    # Get the current worker's name
    worker_name = mp.current_process().name

    while True:
        try:
            task = tasks_pending.get_nowait()
        except queue.Empty:
            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
            time.sleep(0.01)
        else:
            try:
                if task == SENTINEL:
                    print(worker_name + ' no more work left to be done. Exiting...')
                    break

                print(worker_name + ' received some work... ')
                time_start = time.perf_counter()
                work_func = pickle.loads(task['func'])
                result = work_func(**task['task'])
                tasks_completed.put({work_func.__name__: result})
                time_end = time.perf_counter() - time_start
                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
            except Exception as e:
                print(worker_name + ' task failed. ' + str(e))
                tasks_completed.put({work_func.__name__: None})


def par_proc(job_list, num_cpus=None):

    # Get the number of cores
    if not num_cpus:
        num_cpus = psutil.cpu_count(logical=False)

    print('* Parallel processing')
    print('* Running on {} cores'.format(num_cpus))

    # Set-up the queues for sending and receiving data to/from the workers
    tasks_pending = mp.Queue()
    tasks_completed = mp.Queue()

    # Gather processes and results here
    processes = []
    results = []

    # Count tasks
    num_tasks = 0

    # Add the tasks to the queue
    for job in job_list:
        for task in job['tasks']:
            expanded_job = {}
            num_tasks = num_tasks + 1
            expanded_job.update({'func': pickle.dumps(job['func'])})
            expanded_job.update({'task': task})
            tasks_pending.put(expanded_job)

    # Use as many workers as there are cores (usually chokes the system so better use less)
    num_workers = num_cpus

    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
    # work left to be done.
    for c in range(num_workers):
        tasks_pending.put(SENTINEL)

    print('* Number of tasks: {}'.format(num_tasks))

    # Set-up and start the workers
    for c in range(num_workers):
        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
        p.name = 'worker' + str(c)
        processes.append(p)
        p.start()

    # Gather the results
    completed_tasks_counter = 0
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1

    for p in processes:
        p.join()

    return results

这是运行上述代码的测试

def test_parallel_processing():
    def heavy_duty1(arg1, arg2, arg3):
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert job1 == 15
    assert job2 == 21

加上另一个有一些例外

def test_parallel_processing_exceptions():
    def heavy_duty1_raises(arg1, arg2, arg3):
        raise ValueError('Exception raised')
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert not job1
    assert job2 == 21

希望这会有所帮助。

于 2020-08-03T17:20:51.030 回答
9

在“ from queue import Queue”中没有调用模块queue,而是multiprocessing应该使用。因此,它应该看起来像“ from multiprocessing import Queue

于 2018-02-03T01:11:45.423 回答
4

刚刚做了一个简单而通用的示例,用于演示在 2 个独立程序之间通过队列传递消息。它没有直接回答 OP 的问题,但应该足够清楚地表明这个概念。

服务器:

multiprocessing-queue-manager-server.py

import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
    global q

    if not ident in q:
        q[ident] = multiprocessing.Queue()

    return q[ident]


q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')


def init_queue_manager_server():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue', get_queue)


def serve(no: int, term_ev: threading.Event):
    manager: QueueManager
    with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
        print(f"Server address {no}: {manager.address}")

        while not term_ev.is_set():
            try:
                item: Any = manager.get_queue().get(timeout=0.1)
                print(f"Client {no}: {item} from {manager.address}")
            except queue.Empty:
                continue


async def main(n: int):
    init_queue_manager_server()
    term_ev: threading.Event = threading.Event()
    executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()

    i: int
    for i in range(n):
        asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))

    # Gracefully shut down
    try:
        await asyncio.get_running_loop().create_future()
    except asyncio.CancelledError:
        term_ev.set()
        executor.shutdown()
        raise


if __name__ == '__main__':
    asyncio.run(main(int(sys.argv[1])))

客户:

multiprocessing-queue-manager-client.py

import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


delattr(QueueManager, 'get_queue')


def init_queue_manager_client():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue')


def main():
    init_queue_manager_client()

    manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
    manager.connect()

    message = f"A message from {os.getpid()}"
    print(f"Message to send: {message}")
    manager.get_queue().put(message)


if __name__ == '__main__':
    main()

用法

服务器:

$ python3 multiprocessing-queue-manager-server.py N

N是一个整数,表示应该创建多少个服务器。复制<server-address-N>服务器的一个输出并将其作为每个multiprocessing-queue-manager-client.py.

客户:

python3 multiprocessing-queue-manager-client.py <server-address-1>

结果

服务器:

Client 1: <item> from <server-address-1>

要点:https ://gist.github.com/89062d639e40110c61c2f88018a8b0e5


UPD :在这里创建了一个包。

服务器:

import ipcq


with ipcq.QueueManagerServer(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO) as server:
    server.get_queue().get()

客户:

import ipcq


client = ipcq.QueueManagerClient(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO)
client.get_queue().put('a message')

在此处输入图像描述

于 2020-06-27T09:53:58.520 回答
2

我们实现了这两个版本,一个是简单的多线程池,可以执行多种类型的可调用对象,使我们的生活更轻松,第二个版本使用进程,在可调用对象和要求方面不太灵活,需要额外调用 dill。

将frozen_pool 设置为true 将冻结执行,直到在任一类中调用finish_pool_queue。

线程版本:

'''
Created on Nov 4, 2019

@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os

class ThreadPool(object):
    def __init__(self, queue_threads, *args, **kwargs):
        self.frozen_pool = kwargs.get('frozen_pool', False)
        self.print_queue = kwargs.get('print_queue', True)
        self.pool_results = []
        self.lock = Lock()
        self.queue_threads = queue_threads
        self.queue = Queue()
        self.threads = []

        for i in range(self.queue_threads):
            t = Thread(target=self.make_pool_call)
            t.daemon = True
            t.start()
            self.threads.append(t)

    def make_pool_call(self):
        while True:
            if self.frozen_pool:
                #print '--> Queue is frozen'
                sleep(1)
                continue

            item = self.queue.get()
            if item is None:
                break

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.lock.acquire()
                    self.pool_results.append((item, result))
                    self.lock.release()

            except Exception as e:
                self.lock.acquire()
                print e
                traceback.print_exc()
                self.lock.release()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self):
        self.frozen_pool = False

        while self.queue.unfinished_tasks > 0:
            if self.print_queue:
                print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
            sleep(5)

        self.queue.join()

        for i in range(self.queue_threads):
            self.queue.put(None)

        for t in self.threads:
            t.join()

        del self.threads[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]

工艺版本:

  '''
Created on Nov 4, 2019

@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
    RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc

class ProcessPool(object):
    def __init__(self, queue_processes, *args, **kwargs):
        self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
        self.print_queue = kwargs.get('print_queue', True)
        self.manager = Manager()
        self.pool_results = self.manager.list()
        self.queue_processes = queue_processes
        self.queue = JoinableQueue()
        self.processes = []

        for i in range(self.queue_processes):
            p = Process(target=self.make_pool_call)
            p.start()
            self.processes.append(p)

        print 'Processes', self.queue_processes

    def make_pool_call(self):
        while True:
            if self.frozen_pool.value:
                sleep(1)
                continue

            item_pickled = self.queue.get()

            if item_pickled is None:
                #print '--> Ending'
                self.queue.task_done()
                break

            item = dill.loads(item_pickled)

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.pool_results.append(dill.dumps((item, result)))
                else:
                    del call, args, kwargs, keep_results, item, result

            except Exception as e:
                print e
                traceback.print_exc()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self, callable=None):
        self.frozen_pool.value = False

        while self.queue._unfinished_tasks.get_value() > 0:
            if self.print_queue:
                print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))

            if callable:
                callable()

            sleep(5)

        for i in range(self.queue_processes):
            self.queue.put(None)

        self.queue.join()
        self.queue.close()

        for p in self.processes:
            with ignore_exception: p.join(10)
            with ignore_exception: p.terminate()

        with ignore_exception: del self.processes[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]
def test(eg):
        print 'EG', eg

拨打电话:

tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()

或者

pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()
于 2019-12-04T03:57:42.870 回答
2

一个多生产者和多消费者的例子,经过验证。它应该很容易修改以涵盖其他情况,单/多生产者,单/多消费者。

from multiprocessing import Process, JoinableQueue
import time
import os

q = JoinableQueue()

def producer():
    for item in range(30):
        time.sleep(2)
        q.put(item)
    pid = os.getpid()
    print(f'producer {pid} done')


def worker():
    while True:
        item = q.get()
        pid = os.getpid()
        print(f'pid {pid} Working on {item}')
        print(f'pid {pid} Finished {item}')
        q.task_done()

for i in range(5):
    p = Process(target=worker, daemon=True).start()

# send thirty task requests to the worker
producers = []
for i in range(2):
    p = Process(target=producer)
    producers.append(p)
    p.start()

# make sure producers done
for p in producers:
    p.join()

# block until all workers are done
q.join()
print('All work completed')

解释:

  1. 本例中有两个生产者和五个消费者。
  2. JoinableQueue 用于确保存储在队列中的所有元素都将被处理。'task_done' 用于通知工作人员已完成。'q.join()' 将等待所有标记为完成的元素。
  3. 使用#2,无需加入等待每个工人。
  4. 但重要的是加入等待每个生产者将元素存储到队列中。否则程序立即退出。
于 2021-01-18T21:42:24.603 回答