2

如何使协程停止超时?

我不明白为什么 asyncio.wait_for() 对我不起作用。我有这样一段代码(计划实现我的 telnet 客户端):

def expect(self, pattern, timeout=20): 
    if type(pattern) == str:
        pattern = pattern.encode('ascii', 'ignore')        
    return self.loop.run_until_complete(asyncio.wait_for(self.asyncxpect(pattern), timeout))

async def asyncxpect(self, pattern): #receives data in a cumulative way until match is found
    regexp = re.compile(b'(?P<payload>[\s\S]*)(?P<pattern>%s)' %pattern)
    self.buffer = b''
    while True:
        # add timeout
        # add exception handling for unexpectedly closed connections
        data = await self.loop.sock_recv(self.sock, 10000) 
        self.buffer += data
        m = re.match(regexp, self.buffer)
        if m:
            payload = m.group('payload')
            match = m.group('pattern')
            return payload, match 

正如我所认为的这段代码,在某些时候(在等待语句中)将控制权返回给事件循环。我认为当没有更多数据要接收时应该发生这种情况。如果事件循环有控制权,它可以超时停止。

但是如果服务器没有发送任何有用的(匹配的)我的代码就会在这个循环中绊倒,就在等待点。

我认为它与Python asyncio force timeout这个问题不同,因为我没有使用像 time.sleep(n) 这样的阻塞语句。

这是我的代码

4

1 回答 1

4

当服务器关闭连接时,sock_recv返回一个空字节数组(b''),表示文件结束。由于您不处理该条件,因此您的代码最终会陷入处理同一缓冲区的无限循环中。

要更正它,请添加以下内容:

if data == b'':
    break

...在data = await loop.sock_recv(...)线路之后。

但是上面仍然没有解释为什么wait_for无法取消流氓协程。问题在于,await这并不意味着“将控制权传递给事件循环”,正如有时所理解的那样。它的意思是“从提供的可等待对象请求值,如果(并且只要)对象表明它没有准备好值,则将控制权交给事件循环。” if是至关重要的:如果对象在第一次被询问时确实有一个值准备好,则该值将立即使用,而不会推迟到事件循环。换句话说,await 不保证事件循环将有机会运行。

例如,以下协程完全阻塞了事件循环并阻止任何其他协程运行,尽管它的内部循环只包含等待

async def busy_loop():
    while True:
        await noop()

async def noop():
    pass

在您的示例中,由于套接字在文件末尾时根本不会阻塞,因此协程永远不会暂停,并且(与上述错误勾结)您的协程永远不会退出。

为确保其他任务有机会运行,您可以添加await asyncio.sleep(0)循环。对于大多数代码来说,这应该不是必需的,因为请求 IO 数据很快就会导致等待,此时事件循环将启动。(实际上,需要这样做通常表明存在设计缺陷。)在这种情况下,它是仅结合代码卡住的EOF处理错误。

于 2018-02-15T21:09:10.027 回答