9

考虑这段代码:

#!/usr/bin/env python
# coding=utf-8
from string import letters


def filter_upper(letters):
    for letter in letters:
        if letter.isupper():
            yield letter

def filter_selected(letters, selected):
    selected = set(map(str.lower, selected))
    for letter in letters:
        if letter.lower() in selected:
            yield letter


def main():
    stuff = filter_selected(filter_upper(letters), ['a', 'b', 'c'])
    print(list(stuff))

main()

这是由生成器构建的管道的图示。我在实践中经常使用这种模式来构建数据处理流程。这就像 UNIX 管道。

将生成器重构为每次暂停执行的协程的最优雅方法是什么yield

更新

我的第一次尝试是这样的:

#!/usr/bin/env python
# coding=utf-8

import asyncio

@asyncio.coroutine
def coro():
    for e in ['a', 'b', 'c']:
        future = asyncio.Future()
        future.set_result(e)
        yield from future


@asyncio.coroutine
def coro2():
    a = yield from coro()
    print(a)


loop = asyncio.get_event_loop()
loop.run_until_complete(coro2())

但由于某种原因它不起作用 - 变量a变为None.

更新#1

我最近想出的:

服务器:

#!/usr/bin/env python
# coding=utf-8
"""Server that accepts a client and send it strings from user input."""


import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 5555

s.bind((host, port))
s.listen(1)

print('Listening...')

conn, addr = s.accept()

print('Client ({}) connected.'.format(addr))

while True:
    conn.send(raw_input('Enter data to send: '))

客户:

#!/usr/bin/env python
# coding=utf-8
"""Client that demonstrates processing pipeline."""

import trollius as asyncio
from trollius import From


@asyncio.coroutine
def upper(input, output):
    while True:
        char = yield From(input.get())
        print('Got char: ', char)
        yield From(output.put(char.upper()))


@asyncio.coroutine
def glue(input, output):
    chunk = []
    while True:
        e = yield From(input.get())
        chunk.append(e)
        print('Current chunk: ', chunk)
        if len(chunk) == 3:
            yield From(output.put(chunk))
            chunk = []


@asyncio.coroutine
def tcp_echo_client(loop):
    reader, writer = yield From(asyncio.open_connection('127.0.0.1', 5555,
                                                        loop=loop))
    q1 = asyncio.Queue()
    q2 = asyncio.Queue()
    q3 = asyncio.Queue()

    @asyncio.coroutine
    def printer():
        while True:
            print('Pipeline ouput: ', (yield From(q3.get())))

    asyncio.async(upper(q1, q2))
    asyncio.async(glue(q2, q3))
    asyncio.async(printer())

    while True:
        data = yield From(reader.read(100))
        print('Data: ', data)

        for byte in data:
            yield From(q1.put(byte))

    print('Close the socket')
    writer.close()


@asyncio.coroutine
def background_stuff():
    while True:
        yield From(asyncio.sleep(3))
        print('Other background stuff...')


loop = asyncio.get_event_loop()
asyncio.async(background_stuff())
loop.run_until_complete(tcp_echo_client(loop))
loop.close()

与“David Beazley 的协程”相比的优势在于,您可以将asyncio此类处理单元内的所有内容inputoutput队列一起使用。
这里的缺点 - 连接管道单元需要很多队列实例。它可以通过使用比asyncio.Queue.
另一个缺点是这种处理单元不会将它们的异常传播到父堆栈帧,因为它们是“后台任务”,而“David Beazley 的协程”会传播。

更新#2

这就是我想出的:
https ://gist.github.com/AndrewPashkin/04c287def6d165fc2832

4

1 回答 1

5

我认为这里的答案是“你没有”。我猜你是从 David Beazley著名的协程/生成器教程中得到这个想法的。在他的教程中,他基本上将协程用作反向生成器管道。您无需通过迭代生成器将数据拉入管道,而是使用gen_object.send(). 你的第一个例子看起来像这样使用协程的概念:

from string import letters

def coroutine(func):
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)
        cr.next()
        return cr
    return start

@coroutine
def filter_upper(target):
    while True:
        letter = yield
        if letter.isupper():
            target.send(letter)

@coroutine
def filter_selected(selected):
    selected = set(map(str.lower, selected))
    out = []
    try:
        while True:
            letter = yield
            if letter.lower() in selected:
                out.append(letter)
    except GeneratorExit:
        print out

def main():
    filt = filter_upper(filter_selected(['a', 'b', 'c']))
    for letter in letters:
        filt.send(letter)
    filt.close()

if __name__ == "__main__":
    main()

现在,其中的协程asyncio相似,因为它们是可暂停的生成器对象,可以将数据发送到其中,但它们实际上根本不适合数据流水线用例。它们旨在用于在您执行阻塞 I/O 操作时启用并发性。挂起点允许控制在yield fromI/O 发生时返回到事件循环,并且事件循环将在完成时重新启动协程,将 I/O 调用返回的数据发送到协程中。实际上没有实际理由尝试将它们用于这种用例,因为根本不会发生阻塞 I/O。

此外,您尝试使用的问题asyncio是将aa = yield from coro()分配给. 但你实际上并没有从. 你被困在一个真正的协程和一个生成器之间。看起来您希望发送from to的内容,但这不是协程的工作方式。用于从协程/ /中提取数据,并用于将对象实际发送回调用者。所以,要真正返回一些东西到,你需要这样做:corocorocoroyield from futurefuturecorocoro2yield fromFutureTaskreturncorocoro2

@asyncio.coroutine
def coro():
    for e in ['a', 'b', 'c']:
        future = asyncio.Future()
        future.set_result(e)
        return future

但这只会以'a'返回到coro2. 我认为要获得您期望的输出,您需要这样做:

@asyncio.coroutine
def coro():
    future = asyncio.Future()
    future.set_result(['a', 'b', 'c'])
    return future

这可能说明了为什么asyncio协程不是您想要的。

编辑:

好的,考虑到除了实际使用异步 I/O 之外还想使用流水线的情况,我认为您在更新中使用的方法很好。正如您所建议的,可以通过创建数据结构来帮助自动化队列管理来使其变得更简单:

class Pipeline(object):
    def __init__(self, *nodes):
        if len(nodes) < 2:
            raise Exception("Need at least two nodes in the pipeline")
        self.start = asyncio.Queue()
        in_ = self.start
        for node in nodes:
            out = asyncio.Queue()
            asyncio.async(node(in_, out))
            in_ = out

    @asyncio.coroutine
    def put(self, val):
        yield from self.start.put(val)

# ... (most code is unchanged)

@asyncio.coroutine
def printer(input_, output): 
    # For simplicity, I have the sink taking an output queue. Its not being used,
    # but you could make the final output queue accessible from the Pipeline object
    # and then add a get() method to the `Pipeline` itself.
    while True:
        print('Pipeline ouput: ', (yield from input_.get()))


@asyncio.coroutine
def tcp_echo_client(loop):
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 5555,
                                                        loop=loop)
    pipe = Pipeline(upper, glue, printer)

    while True:
        data = yield from reader.read(100)
        if not data:
            break
        print('Data: ', data)

        for byte in data.decode('utf-8'):
            yield from pipe.put(byte)  # Add to the pipe

    print('Close the socket')
    writer.close()

这简化Queue了管理,但没有解决异常处理问题。我不确定是否可以做很多事情......

于 2014-12-01T17:03:53.673 回答