5

Socket 模块有一个socket.recv_into方法,因此它可以使用用户定义的bytebuffer(如bytearray)进行零拷贝。但也许BaseEventLoop没有那样的方法。有没有办法使用像socket.recv_intoasyncio 这样的方法?

4

3 回答 3

1

可以实现自己的 asyncio 传输,它利用.recv_into()函数,但是是的,现在 asyncio 还没有一种.recv_into()开箱即用的方法。

我个人怀疑是否有很大的加速:当你用 C 开发时,零拷贝非常重要,但对于像 Python 这样的高级语言来说,好处要小得多。

于 2015-01-20T16:57:20.983 回答
1

定义的低级套接字操作BaseEventLoop需要传入一个socket.socket对象,例如BaseEventLoop.sock_recv(sock, nbytes). 因此,鉴于您有socket.socket,您可以调用sock.recv_into()。这样做是否是一个好主意是另一个问题。

于 2015-01-20T12:28:34.630 回答
0

更新:从 Python 3.7.0 开始,在我写这篇文章时它是 alpha 版本,标准库的 asyncio 模块文档 AbstractEventLoop.sock_recv_into()。

编辑:根据要求扩展答案...

对 asyncio 的调用sock_recv_into()通常如下所示:

byte_count = await loop.sock_recv_into(sock, buff)

buff 是一个可变对象,它实现了 Python 的缓冲区协议,其示例包括字节数组和字节数组上的内存视图。下面的代码演示了使用 memoryview 接收到字节数组中。

异步套接字的工作演示代码必须包含一堆脚手架来设置连接的两侧并运行事件循环。这里的重点是在下面的 sock_read_exactly() 协程中使用 asyncio 的 sock_recv_into()。

#!/usr/bin/env python3

"""Demo the asyncio module's sock_recv_into() facility."""

import sys
assert sys.version_info[:2] >= (3, 7), (
        'asyncio sock_recv_into() new in Python 3.7')

import socket
import asyncio

def local_echo_server(port=0):
    """Trivial treaded echo server with sleep delay."""
    import threading
    import time
    import random
    ssock = socket.socket()
    ssock.bind(('127.0.0.1', port))
    _, port = ssock.getsockname()
    ssock.listen(5)
    def echo(csock):
        while True:
            data = csock.recv(8192)
            if not data:
                break
            time.sleep(random.random())
            csock.sendall(data)
        csock.shutdown(1)
    def serve():
        while True:
            csock, client_addr = ssock.accept()
            tclient = threading.Thread(target=echo, args=(csock,), daemon=True)
            tclient.start()
    tserve = threading.Thread(target=serve, daemon=True)
    tserve.start()
    return port


N_COROS = 100
nrunning = 0

async def sock_read_exactly(sock, size, loop=None):
    "Read and return size bytes from sock in event-loop loop."
    if loop is None: loop = asyncio.get_event_loop()
    bytebuff = bytearray(size)
    sofar = 0
    while sofar < size:
        memview = memoryview(bytebuff)[sofar:]
        nread = await loop.sock_recv_into(sock, memview)
        print('socket', sock.getsockname(), 'read %d bytes' % nread)
        if not nread:
            raise RuntimeError('Unexpected socket shutdown.')
        sofar += nread
    return bytebuff

async def echo_client(port):
    "Send random data to echo server and test that we get back the same."
    from os import urandom
    global nrunning
    loop = asyncio.get_event_loop()
    sock = socket.socket()
    sock.setblocking(False)
    await loop.sock_connect(sock, ('127.0.0.1', port))
    for size in [1, 64, 1024, 55555]:
        sending = urandom(size)
        await loop.sock_sendall(sock, sending)
        received = await sock_read_exactly(sock, size)
        assert received == sending
    nrunning -= 1
    if not nrunning:
        loop.stop()


if __name__ == '__main__':
    port = local_echo_server()
    print('port is', port)
    loop = asyncio.get_event_loop()
    for _ in range(N_COROS):
        loop.create_task(echo_client(port))
        nrunning += 1
    print('Start loop.')
    loop.run_forever()
于 2017-12-19T16:42:44.963 回答