我在处理异步事件循环中的一些 CPU 密集型任务时遇到了问题。在处理传入数据的缓冲区并从中构建数据包时,我遇到了麻烦。我尝试使用执行程序来执行 CPU 绑定的工作,但是当从缓冲区中删除数据包时,我无法维护缓冲区的顺序。
我正在寻找一种最佳实践方法来实现以下功能,而无需在事件循环中执行 CPU 绑定任务。
import asyncio
import struct
class Reader(asyncio.Protocol):
def __init__(self):
self.extra = bytearray()
def data_received(self, data):
self.extra.extend(data)
packet = get_packet(bytes(self.extra))
if packet:
del self.extra[:len(packet)]
if verify_hash(packet): # CPU intensive
asyncio.async(distribute(packet)) # Some asyncio fan-out callback
def get_packet(data): # CPU intensive
if len(data) > HEADER_SIZE:
payload_size, = struct.unpack_from(HEADER_FORMAT, data)
if len(data) >= HEADER_SIZE + payload_size:
return data[:HEADER_SIZE + payload_size]
return None
loop = asyncio.get_event_loop()
loop.run_until_complete(loop.create_server(Reader, '0.0.0.0', 8000))
loop.run_forever()