34

我试图弄清楚如何移植一个线程程序以使用asyncio. 我有很多围绕几个标准库同步的代码Queues,基本上是这样的:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()

一个线程创建值(可能是用户输入),另一个线程对它们进行处理。关键是这些线程在有新数据之前处于空闲状态,此时它们会醒来并对其进行处理。

我正在尝试使用 asyncio 来实现这种模式,但我似乎无法弄清楚如何让它“运行”。

我的尝试或多或少看起来像这样(并且根本不做任何事情)。

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()? 

loop = asyncio.get_event_loop()
loop.run_forever()

我尝试过使用协程的变体,不使用它们,在任务中包装东西,试图让它们创建或返回期货等。

我开始认为我对应该如何使用 asyncio 有错误的想法(也许这种模式应该以我不知道的不同方式实现)。任何指针将不胜感激。

4

3 回答 3

38

对,就是这样。任务是你的朋友:

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True:
        yield from q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)


loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()

asyncio.ensure_future也可用于创建任务。

请记住:q.put()协程,所以你应该使用yield from q.put(value).

UPD

asyncio.Task()/切换asyncio.async()到新品牌 APIloop.create_task()asyncio.ensure_future()示例。

于 2014-05-26T09:41:44.763 回答
5

这是我在生产中使用的,转移到要点:https ://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d

于 2016-06-23T22:20:29.360 回答
2

稍后,也许是 OT,请记住,您可以Queue从多个任务中消费,因为它们是独立的消费者。

以下片段作为示例显示了如何使用asyncio任务实现相同的线程池模式。

q = asyncio.Queue()

async def sum(x):
    await asyncio.sleep(0.1)  # simulates asynchronously
    return x

async def consumer(i):
    print("Consumer {} started".format(i))
    while True:
        f, x = await q.get()
        print("Consumer {} procesing {}".format(i, x))
        r = await sum(x)
        f.set_result(r)

async def producer():
    consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]
    loop = asyncio.get_event_loop()
    tasks = [(asyncio.Future(), x) for x in range(10)]
    for task in tasks:
        await q.put(task)

    # wait until all futures are completed
    results = await asyncio.gather(*[f for f, _ in tasks])
    assert results == [r for _, r in tasks]

    # destroy tasks
    for c in consumers:
        c.cancel()


asyncio.get_event_loop().run_until_complete(producer())
于 2016-09-22T11:17:34.913 回答