考虑这段代码:
#!/usr/bin/env python
# coding=utf-8
from string import letters
def filter_upper(letters):
for letter in letters:
if letter.isupper():
yield letter
def filter_selected(letters, selected):
selected = set(map(str.lower, selected))
for letter in letters:
if letter.lower() in selected:
yield letter
def main():
stuff = filter_selected(filter_upper(letters), ['a', 'b', 'c'])
print(list(stuff))
main()
这是由生成器构建的管道的图示。我在实践中经常使用这种模式来构建数据处理流程。这就像 UNIX 管道。
将生成器重构为每次暂停执行的协程的最优雅方法是什么yield
?
更新
我的第一次尝试是这样的:
#!/usr/bin/env python
# coding=utf-8
import asyncio
@asyncio.coroutine
def coro():
for e in ['a', 'b', 'c']:
future = asyncio.Future()
future.set_result(e)
yield from future
@asyncio.coroutine
def coro2():
a = yield from coro()
print(a)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro2())
但由于某种原因它不起作用 - 变量a
变为None
.
更新#1
我最近想出的:
服务器:
#!/usr/bin/env python
# coding=utf-8
"""Server that accepts a client and send it strings from user input."""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 5555
s.bind((host, port))
s.listen(1)
print('Listening...')
conn, addr = s.accept()
print('Client ({}) connected.'.format(addr))
while True:
conn.send(raw_input('Enter data to send: '))
客户:
#!/usr/bin/env python
# coding=utf-8
"""Client that demonstrates processing pipeline."""
import trollius as asyncio
from trollius import From
@asyncio.coroutine
def upper(input, output):
while True:
char = yield From(input.get())
print('Got char: ', char)
yield From(output.put(char.upper()))
@asyncio.coroutine
def glue(input, output):
chunk = []
while True:
e = yield From(input.get())
chunk.append(e)
print('Current chunk: ', chunk)
if len(chunk) == 3:
yield From(output.put(chunk))
chunk = []
@asyncio.coroutine
def tcp_echo_client(loop):
reader, writer = yield From(asyncio.open_connection('127.0.0.1', 5555,
loop=loop))
q1 = asyncio.Queue()
q2 = asyncio.Queue()
q3 = asyncio.Queue()
@asyncio.coroutine
def printer():
while True:
print('Pipeline ouput: ', (yield From(q3.get())))
asyncio.async(upper(q1, q2))
asyncio.async(glue(q2, q3))
asyncio.async(printer())
while True:
data = yield From(reader.read(100))
print('Data: ', data)
for byte in data:
yield From(q1.put(byte))
print('Close the socket')
writer.close()
@asyncio.coroutine
def background_stuff():
while True:
yield From(asyncio.sleep(3))
print('Other background stuff...')
loop = asyncio.get_event_loop()
asyncio.async(background_stuff())
loop.run_until_complete(tcp_echo_client(loop))
loop.close()
与“David Beazley 的协程”相比的优势在于,您可以将asyncio
此类处理单元内的所有内容input
与output
队列一起使用。
这里的缺点 - 连接管道单元需要很多队列实例。它可以通过使用比asyncio.Queue
.
另一个缺点是这种处理单元不会将它们的异常传播到父堆栈帧,因为它们是“后台任务”,而“David Beazley 的协程”会传播。
更新#2
这就是我想出的:
https ://gist.github.com/AndrewPashkin/04c287def6d165fc2832