3

我正在 Python 3.6 中实现简单的 asyncio HTTP2 服务器和客户端。它需要实现流量控制。我已经使用函数 self.outbound_flow_control_window=2048 在客户端将流量控制窗口设置为 2048 字节,在客户端发送 2048 字节数据块后,服务器不处理并确认接收到的数据,因此客户端可以发送另一块 2048字节

我已经尝试过这些功能,self.conn.acknowledge_received_data(2048, event.stream_id) self.conn.increment_flow_control_window(2048, event.stream_id)

elif isinstance(event, DataReceived):
    self.receive_data(event.data, event.stream_id)
    self.conn.acknowledge_received_data(2048, event.stream_id)
    self.conn.increment_flow_control_window(2048, event.stream_id)

从客户端接收数据(2048 字节)后,我希望服务器确认并更新客户端它现在可以发送更多数据,但客户端上的 flow_control_windows 仍然为 0,即使在接收到窗口更新帧之后也是如此

4

1 回答 1

3

你有没有流控制工作的示例服务器吗?如果不这样做。

https://github.com/python-hyper/hyper-h2/blob/master/examples/asyncio/asyncio-server.py

您正在混合手动和自动流量控制。重读此处的自动流量控制部分并使用自动控制。

https://python-hyper.org/projects/h2/en/stable/advanced-usage.html

这种自动策略是围绕一个方法构建的:acknowledge_received_data。此方法向连接对象标记您的应用程序已经处理了一定数量的流控制字节,并且窗口应该以某种方式递增。每当您的应用程序“处理”了一些接收到的字节时,都应该调用此方法来表示它们已被处理。

此方法与 increment_flow_control_window 之间的主要区别在于,方法确认接收数据不保证它会发出 WINDOW_UPDATE 帧,并且如果它保证它不一定只为流或帧发出它们。相反,WINDOW_UPDATE 帧将被合并:只有在释放一定数量的字节时才会发出它们。

现在看看使用流控制的古玩示例。如果您从服务器接收窗口更新事件,则可能您没有正确处理流 id 0。

https://github.com/python-hyper/hyper-h2/blob/master/examples/curio/curio-server.py

特别是发送数据功能:

 while True:
        while not self.conn.local_flow_control_window(stream_id):
            await self.wait_for_flow_control(stream_id)

        chunk_size = min(
            self.conn.local_flow_control_window(stream_id),
            READ_CHUNK_SIZE,
        )

        data = fileobj.read(chunk_size)
        keep_reading = (len(data) == chunk_size)

        self.conn.send_data(stream_id, data, not keep_reading)
        await self.sock.sendall(self.conn.data_to_send())

如果要发送 4k 字节,请在流控制窗口中等待,请发送 2k 字节,然后在流控制窗口中再次等待。

如果您收到窗口更新,您应该有这样的代码

async def window_updated(self, event):
    """
    Unblock streams waiting on flow control, if needed.
    """
    stream_id = event.stream_id

    if stream_id and stream_id in self.flow_control_events:
        evt = self.flow_control_events.pop(stream_id)
        await evt.set()
    elif not stream_id:
        # Need to keep a real list here to use only the events present at
        # this time.
        blocked_streams = list(self.flow_control_events.keys())
        for stream_id in blocked_streams:
            event = self.flow_control_events.pop(stream_id)
            await event.set()
    return
于 2019-02-10T18:49:24.427 回答