我正在尝试在我自己的应用服务器中实现 http/2 堆栈,该服务器是我使用 asyncio 从头开始构建的。据我了解,asyncio 在内部维护了一个“任务”队列,事件循环使用该队列来运行任务。现在,为了实现流优先级,我需要能够比低优先级任务运行更长时间的高优先级任务(我在想的任务,调用应用程序(范围,接收,发送)返回的协程为根据ASGI 规范。)我无法找到一种方法来确定 asyncio 使用的内部队列的优先级。
我什至考虑过捕获我得到的事件字典作为应用程序(范围,接收,发送)中可调用发送的参数,但 asgi 规范说“协议服务器必须在从发送调用“返回之前将传递给它们的任何数据刷新到发送缓冲区。这里的“发送缓冲区”是什么意思?是操作系统/内核发送缓冲区吗?
我是否在错误地考虑流优先级?什么是实现这一点的好方法?
class Worker(object):
def get_asgi_event_dict(self, frame):
event_dict = {
"type": "http",
"asgi": {"version": "2.0", "spec_version": "2.1"},
"http_version": "2",
"method": frame.get_method(),
"scheme": "https",
"path": frame.get_path(),
"query_string": "",
"headers": [
[k.encode("utf-8"), v.encode("utf-8")] for k, v in frame.headers.items()
],
}
return event_dict
async def handle_request(self):
try:
while True:
self.request_data = self.client_connection.recv(4096)
self.frame = self.parse_request(self.request_data)
if isinstance(self.frame, HeadersFrame):
if self.frame.end_stream:
current_stream = Stream(
self.connection_settings,
self.header_encoder,
self.header_decoder,
self.client_connection,
)
current_stream.stream_id = self.frame.stream_id
asgi_scope = self.get_asgi_event_dict(self.frame)
current_stream.asgi_app = self.application(asgi_scope)
# The next line puts the coroutine obtained from a call to
# current_stream.asgi_app on the "tasks" queue and hence
# out of my control to prioritize.
await current_stream.asgi_app(
self.trigger_asgi_application, current_stream.send_response
)
else:
self.asgi_scope = self.get_asgi_event_dict(self.frame)
except Exception:
print("Error occurred in handle_request")
print((traceback.format_exc()))
class Stream(object):
async def send_response(self, event):
# converting the event dict into http2 frames and sending them to the client.