0

我有一个tornado应用程序需要在ProcessPoolExecutor. blinker这个阻塞函数使用了一个通过事件发出增量结果的库。我想收集这些事件并tornado在它们发生时将它们发送回我的应用程序。

起初,tornado它似乎是这个用例的理想选择,因为它是异步的。我想我可以简单地将一个tornado.queues.Queue对象传递给要在池上运行的函数,然后将事件作为我的事件回调put()的一部分传递到这个队列中。blinker

但是,阅读 的文档后tornado.queues.Queue,我了解到它们不是跨进程管理multiprocessing.Queue的,也不是线程安全的。

有没有办法从pool它们发生时检索这些事件?我应该包装multiprocessing.Queue让它产生Futures吗?这似乎不太可能奏效,因为我怀疑其内部multiprocessing是否与tornado.

[编辑] 这里有一些很好的线索:https ://gist.github.com/hoffrocket/8050711

4

2 回答 2

1

要收集传递给 a 的任务的返回值以外的任何内容ProcessPoolExecutor,您必须使用 a multiprocessing.Queue(或库中的其他对象multiprocessing)。然后,由于multiprocessing.Queue只公开了一个同步接口,您必须使用父进程中的另一个线程从队列中读取(无需了解实现细节。这里可以使用一个文件描述符,但我们现在将忽略它,因为它是未记录并可能更改)。

这是一个未经测试的快速示例:

queue = multiprocessing.Queue()
proc_pool = concurrent.futures.ProcessPoolExecutor()
thread_pool = concurrent.futures.ThreadPoolExecutor()

async def read_events():
    while True:
        event = await thread_pool.submit(queue.get)
        print(event)

async def foo():
    IOLoop.current.spawn_callback(read_events)
    await proc_pool.submit(do_something_and_write_to_queue)
于 2017-02-23T18:33:29.983 回答
0

你可以做的比这更简单。这是一个向子进程提交四个慢速函数调用并等待它们的协程:

from concurrent.futures import ProcessPoolExecutor
from time import sleep

from tornado import gen, ioloop

pool = ProcessPoolExecutor()


def calculate_slowly(x):
    sleep(x)
    return x


async def parallel_tasks():
    # Create futures in a randomized order.
    futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
               for i in [1, 3, 2, 4]]

    wait_iterator = gen.WaitIterator(*futures)
    while not wait_iterator.done():
        try:
            result = await wait_iterator.next()
        except Exception as e:
            print("Error {} from {}".format(e, wait_iterator.current_future))
        else:
            print("Result {} received from future number {}".format(
                result, wait_iterator.current_index))


ioloop.IOLoop.current().run_sync(parallel_tasks)

它输出:

Result 1 received from future number 0
Result 2 received from future number 2
Result 3 received from future number 1
Result 4 received from future number 3

你可以看到协程按照它们完成的顺序接收结果,而不是它们提交的顺序:future number 1 在future number 2 之后解析,因为future number 1 睡得更久。convert_yielded 将 ProcessPoolExecutor 返回的 Futures 转换为 Tornado 兼容的 Futures,可以在协程中等待。

每个future 解析为calculate_slowly 返回的值:在这种情况下,它与传递给calculate_slowly 的数字相同,并且与calculate_slowly 休眠的秒数相同。

要将其包含在 RequestHandler 中,请尝试以下操作:

class MainHandler(web.RequestHandler):
    async def get(self):
        self.write("Starting....\n")
        self.flush()

        futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
                   for i in [1, 3, 2, 4]]

        wait_iterator = gen.WaitIterator(*futures)
        while not wait_iterator.done():
            result = await wait_iterator.next()
            self.write("Result {} received from future number {}\n".format(
                result, wait_iterator.current_index))

            self.flush()


if __name__ == "__main__":
    application = web.Application([
        (r"/", MainHandler),
    ])
    application.listen(8888)
    ioloop.IOLoop.instance().start()

您可以观察curl localhost:8888服务器是否以增量方式响应客户端请求。

于 2017-02-21T19:14:19.887 回答