17

我想使用队列将数据从父进程传递到通过multiprocessing.Process. 但是,由于父进程使用 Python 的新asyncio库,队列方法需要是非阻塞的。据我了解,asyncio.Queue是为任务间通信而设计的,不能用于进程间通信。另外,我知道multiprocessing.Queueput_nowait()get_nowait()方法,但我实际上需要仍然会阻止当前任务(但不是整个过程)的协程。有没有办法创建包装put_nowait()/的协程get_nowait()?另一方面,multiprocessing.Queue使用内部兼容的线程是否与在同一进程中运行的事件循环完全兼容?

如果没有,我还有什么其他选择?我知道我可以通过使用异步套接字自己实现这样的队列,但我希望我可以避免这种情况……

编辑: 我也考虑过使用管道而不是套接字,但它似乎asynciomultiprocessing.Pipe(). 更准确地说,Pipe()返回不是类文件Connection对象的对象元组。但是,的方法/方法和/都需要类似文件的对象,因此不可能异步读取/写入这样的. 相比之下,包用作管道的通常类似文件的对象根本没有问题,并且可以很容易地与.asyncio.BaseEventLoopadd_reader()add_writer()connect_read_pipe()connect_write_pipe()Connectionsubprocessasyncio

更新: 我决定进一步探索管道方法:我Connection通过multiprocessing.Pipe()检索文件描述符fileno()并将其传递给os.fdopen(). 最后,我将生成的类文件对象传递给事件循环的connect_read_pipe()/ connect_write_pipe()。(如果有人对确切的代码感兴趣,有一些关于相关问题的邮件列表讨论read()。)但是,流给了我一个OSError: [Errno 9] Bad file descriptor并且我没有设法解决这个问题。还考虑到缺少对 Windows 的支持,我不会再继续这样做了。

4

3 回答 3

20

这是multiprocessing.Queue可以与 一起使用的对象的实现asyncio。它提供了整个multiprocessing.Queue接口,并添加了coro_getcoro_put方法,这些方法asyncio.coroutine可用于异步地从队列中获取/放入队列。实现细节与我另一个答案的第二个示例基本相同:ThreadPoolExecutor用于使get/put异步,amultiprocessing.managers.SyncManager.Queue用于在进程之间共享队列。唯一的附加技巧是实现__getstate__保持对象可拾取,尽管使用不可拾取ThreadPoolExecutor作为实例变量。

from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def AsyncProcessQueue(maxsize=0):
    m = Manager()
    q = m.Queue(maxsize=maxsize)
    return _ProcQueue(q)   

class _ProcQueue(object):
    def __init__(self, q):
        self._queue = q
        self._real_executor = None
        self._cancelled_join = False

    @property
    def _executor(self):
        if not self._real_executor:
            self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
        return self._real_executor

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_real_executor'] = None
        return self_dict

    def __getattr__(self, name):
        if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
                    'get', 'get_nowait', 'close']:
            return getattr(self._queue, name)
        else:
            raise AttributeError("'%s' object has no attribute '%s'" % 
                                    (self.__class__.__name__, name))

    @asyncio.coroutine
    def coro_put(self, item):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.put, item))

    @asyncio.coroutine    
    def coro_get(self):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.get))

    def cancel_join_thread(self):
        self._cancelled_join = True
        self._queue.cancel_join_thread()

    def join_thread(self):
        self._queue.join_thread()
        if self._real_executor and not self._cancelled_join:
            self._real_executor.shutdown()

@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    print("Passing %s to parent" % ok)
    yield from q.coro_put(ok)  # Non-blocking
    item = q.get() # Can be used with the normal blocking API, too
    print("got %s back from parent" % item)

def do_coro_proc_work(q, stuff, stuff2):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_coro_proc_work, q, 1, 2)
    item = yield from q.coro_get()
    print("Got %s from worker" % item)
    item = item + 25
    q.put(item)

if __name__  == "__main__":
    q = AsyncProcessQueue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

输出:

Passing 3 to parent
Got 3 from worker
got 28 back from parent

如您所见,您可以AsyncProcessQueue从父进程或子进程同步和异步使用。它不需要任何全局状态,并且通过将大部分复杂性封装在一个类中,使用起来比我原来的答案更优雅。

您可能能够直接使用套接字获得更好的性能,但是以跨平台的方式使其工作似乎非常棘手。这还具有可在多个工人之间使用的优点,不需要您自己腌制/解封等。

于 2014-07-11T19:17:04.503 回答
5

不幸的是,该multiprocessing库并不是特别适合与 一起使用。asyncio但是,根据您计划使用multiprocessing/的multprocessing.Queue方式,您可以将其完全替换为concurrent.futures.ProcessPoolExecutor

import asyncio
from concurrent.futures import ProcessPoolExecutor


def do_proc_work(stuff, stuff2):  # This runs in a separate process
    return stuff + stuff2

@asyncio.coroutine
def do_work():
    out = yield from loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                                          do_proc_work, 1, 2)
    print(out)

if __name__  == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work())

输出:

3

如果您绝对需要 a multiprocessing.Queue,与 结合使用时它似乎会表现良好ProcessPoolExecutor

import asyncio
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


def do_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    time.sleep(5) # Artificial delay to show that it's running asynchronously
    print("putting output in queue")
    q.put(ok)

@asyncio.coroutine
def async_get(q):
    """ Calls q.get() in a separate Thread. 

    q.get is an I/O call, so it should release the GIL.
    Ideally there would be a real non-blocking I/O-based 
    Queue.get call that could be used as a coroutine instead 
    of this, but I don't think one exists.

    """
    return (yield from loop.run_in_executor(ThreadPoolExecutor(max_workers=1), 
                                           q.get))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_proc_work, q, 1, 2)
    coro = async_get(q) # You could do yield from here; I'm not just to show that it's asynchronous
    print("Getting queue result asynchronously")
    print((yield from coro))

if __name__  == "__main__":
    m = multiprocessing.Manager()
    q = m.Queue() # The queue must be inherited by our worker, it can't be explicitly passed in
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

输出:

Getting queue result asynchronously
putting output in queue
3
于 2014-07-10T23:12:31.347 回答
1

aiopipe ( https://pypi.org/project/aiopipe/ ) 看起来像是一针见血。

至少它帮助了我..

于 2020-10-20T14:46:41.490 回答