Socket 模块有一个socket.recv_into
方法,因此它可以使用用户定义的bytebuffer
(如bytearray
)进行零拷贝。但也许BaseEventLoop
没有那样的方法。有没有办法使用像socket.recv_into
asyncio 这样的方法?
3 回答
您可以实现自己的 asyncio 传输,它利用.recv_into()
函数,但是是的,现在 asyncio 还没有一种.recv_into()
开箱即用的方法。
我个人怀疑是否有很大的加速:当你用 C 开发时,零拷贝非常重要,但对于像 Python 这样的高级语言来说,好处要小得多。
定义的低级套接字操作BaseEventLoop
需要传入一个socket.socket
对象,例如BaseEventLoop.sock_recv(sock, nbytes)
. 因此,鉴于您有socket.socket
,您可以调用sock.recv_into()
。这样做是否是一个好主意是另一个问题。
更新:从 Python 3.7.0 开始,在我写这篇文章时它是 alpha 版本,标准库的 asyncio 模块文档 AbstractEventLoop.sock_recv_into()。
编辑:根据要求扩展答案...
对 asyncio 的调用sock_recv_into()
通常如下所示:
byte_count = await loop.sock_recv_into(sock, buff)
buff 是一个可变对象,它实现了 Python 的缓冲区协议,其示例包括字节数组和字节数组上的内存视图。下面的代码演示了使用 memoryview 接收到字节数组中。
异步套接字的工作演示代码必须包含一堆脚手架来设置连接的两侧并运行事件循环。这里的重点是在下面的 sock_read_exactly() 协程中使用 asyncio 的 sock_recv_into()。
#!/usr/bin/env python3
"""Demo the asyncio module's sock_recv_into() facility."""
import sys
assert sys.version_info[:2] >= (3, 7), (
'asyncio sock_recv_into() new in Python 3.7')
import socket
import asyncio
def local_echo_server(port=0):
"""Trivial treaded echo server with sleep delay."""
import threading
import time
import random
ssock = socket.socket()
ssock.bind(('127.0.0.1', port))
_, port = ssock.getsockname()
ssock.listen(5)
def echo(csock):
while True:
data = csock.recv(8192)
if not data:
break
time.sleep(random.random())
csock.sendall(data)
csock.shutdown(1)
def serve():
while True:
csock, client_addr = ssock.accept()
tclient = threading.Thread(target=echo, args=(csock,), daemon=True)
tclient.start()
tserve = threading.Thread(target=serve, daemon=True)
tserve.start()
return port
N_COROS = 100
nrunning = 0
async def sock_read_exactly(sock, size, loop=None):
"Read and return size bytes from sock in event-loop loop."
if loop is None: loop = asyncio.get_event_loop()
bytebuff = bytearray(size)
sofar = 0
while sofar < size:
memview = memoryview(bytebuff)[sofar:]
nread = await loop.sock_recv_into(sock, memview)
print('socket', sock.getsockname(), 'read %d bytes' % nread)
if not nread:
raise RuntimeError('Unexpected socket shutdown.')
sofar += nread
return bytebuff
async def echo_client(port):
"Send random data to echo server and test that we get back the same."
from os import urandom
global nrunning
loop = asyncio.get_event_loop()
sock = socket.socket()
sock.setblocking(False)
await loop.sock_connect(sock, ('127.0.0.1', port))
for size in [1, 64, 1024, 55555]:
sending = urandom(size)
await loop.sock_sendall(sock, sending)
received = await sock_read_exactly(sock, size)
assert received == sending
nrunning -= 1
if not nrunning:
loop.stop()
if __name__ == '__main__':
port = local_echo_server()
print('port is', port)
loop = asyncio.get_event_loop()
for _ in range(N_COROS):
loop.create_task(echo_client(port))
nrunning += 1
print('Start loop.')
loop.run_forever()