24

我有一个Connection用于包含读取和写入asyncio连接流的对象:

class Connection(object):

    def __init__(self, stream_in, stream_out):
        object.__init__(self)

        self.__in = stream_in
        self.__out = stream_out

    def read(self, n_bytes : int = -1):
        return self.__in.read(n_bytes)

    def write(self, bytes_ : bytes):
        self.__out.write(bytes_)
        yield from self.__out.drain()

在服务器端,每次客户端连接时connected创建一个对象,然后读取 4 个字节。Connection

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)

在客户端,写出 4 个字节。

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')

这几乎可以按预期工作,但我必须yield from每次都read打电话write。我试过yield from从里面 ing Connection

def read(self, n_bytes : int = -1):
    data = yield from self.__in.read(n_bytes)
    return data

但我没有得到数据,而是得到一个输出

<generator object StreamReader.read at 0x1109983b8>

如果我从多个地方打电话readwrite我不想yield from每次都重复 s;而是将它们留在里面Connection。我的最终目标是将我的功能缩减new_conection为:

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    print(conn.read(4))
4

2 回答 2

5

因为StreamReader.read是 coroutine,所以调用它的唯一选择是 a) 将它包装在 a TaskorFuture中并通过事件循环运行它,b)await从用 定义的协程中调用它async def,或 c)yield from从定义为修饰函数的协程中使用它与@asyncio.coroutine.

由于Connection.read是从事件循环中调用的(通过 coroutine new_connection),因此您不能重用该事件循环来运行 aTaskFuturefor StreamReader.read事件循环在它们已经运行时无法启动。您要么必须停止事件循环(灾难性的并且可能无法正确执行),要么创建一个新的事件循环(混乱并违背使用协程的目的)。这些都不是可取的,因此Connection.read需要是协程或async函数。

其他两个选项(awaitasync def协程或-decorated 函数中)大多是等效的yield from@asyncio.coroutine唯一的区别是async defandawait是在 Python 3.5 中添加的,因此对于 3.4,使用yield fromand@asyncio.coroutine是唯一的选择(协程asyncio在 3.4 之前不存在,因此其他版本无关紧要)。就个人而言,我更喜欢使用async defand await,因为使用 with 定义协程async def比使用装饰器更清晰。

简而言之:haveConnection.readnew_connectionbe 协程(使用装饰器或async关键字),并在调用其他协程(in和in )时使用await(or )。yield fromawait conn.read(4)new_connectionawait self.__in.read(n_bytes)Connection.read

于 2017-09-03T03:47:54.237 回答
2

我在第 620 行发现了一段StreamReader 源代码实际上是该函数用法的完美示例。

在我之前的回答中,我忽略了一个事实,即self.__in.read(n_bytes)它不仅是一个协程(考虑到它来自模块 XD,我应该知道它asyncio),而且它会在 line 上产生结果。所以它实际上是一个生成器,你需要从中屈服。

从源代码中借用这个循环,您的读取函数应该如下所示:

def read(self, n_bytes : int = -1):
    data = bytearray() #or whatever object you are looking for
    while 1:
        block = yield from self.__in.read(n_bytes)
        if not block:
            break
        data += block
    return data

因为self.__in.read(n_bytes)它是一个生成器,所以你必须继续从中产生,直到它产生一个空结果来表示读取结束。现在您的读取函数应该返回数据而不是生成器。您不必从这个版本的conn.read().

于 2017-08-30T13:21:23.550 回答