5

我正在编写一个简单的速度测试程序,它应该测量两个设备之间的快速吞吐量。有一个 python 脚本可以启动客户端或服务器端。客户端生成流量,服务器测量它接收到的流量。就如此容易。但不知何故,即使我在同一台机器上同时运行服务器和客户端,它的上限也是 ca。115 兆比特/秒。在同一台机器上,当我生成 TCP 或 UDP 流量时,它的流量约为 30 Gb/s,因此带宽不是问题。我使用的库是 aioquic,不幸的是它几乎没有记录。问题是如何增加吞吐量?我主要担心#HERE 注释周围的一些代码。看起来传输()是一个阻塞函数,但是当我没有那种暂停/继续机制或某种粗略的睡眠()时,程序就会挂起,甚至会产生低流量。

from connector import * #my base class
import asyncio as asc
import aioquic.asyncio
from aioquic.quic.configuration import QuicConfiguration
import ssl

class QUIC_client(client):
    def __init__(self, host, port, use_json):
        super().__init__(host, port, use_json)
        self.traffic_type = 'quic'
        self.payload = b'p'*(self.quic_payload_size-1)+b'\n'

        self.config = QuicConfiguration(
            is_client = True,
            verify_mode = ssl.CERT_NONE
            )

    async def _maintask(self, time):
        self._infomessage(f'connecting to server {self.host}:{self.port}')
        async with aioquic.asyncio.connect(host=self.host, port=self.port, configuration=self.config) as client:
            await client.wait_connected()
            self._infomessage(message=f'connection successful')
            reader, writer = await client.create_stream()
            self._infomessage(message=f'stream created, now transmitting')

            timetofinish = millis() + (time*1000)
            while(millis() < timetofinish):
                for i in range(300):
                    writer.write(self.payload)
                writer.write(b'PAUSE\n')
                client.transmit()
                #HERE
                #when i just send data over and over again program hangs on client side
                #thats why i send 'PAUSE' and wait for 'CONTINUE'
                #its just a temporary solution but i couldnt find anything to just wait until send is complete
                line = await reader.readline()
                if line == b'CONTINUE\n':
                    #self._infomessage(message=f'continuing...')
                    pass
                else:
                    self._infomessage(message=f'connection closed')
                    break

            writer.write(b'STOP\n')
            client.transmit()
            client.close()
            await client.wait_closed()
            self._infomessage(message=f'client finished')

    def run_test(self, time):
        super().run_test(time)
        loop = asc.get_event_loop()
        loop.run_until_complete(self._maintask(time))



class QUIC_server(server):
    def __init__(self, port, interval, use_json):
        super().__init__(port, interval, use_json)
        self.traffic_type = 'quic'
        self.config = QuicConfiguration(
            is_client = False
            )
        self.config.load_cert_chain('cert.pem', 'key.pem')
        self.loop = asc.get_event_loop()

    def _streamhandler(self, reader, writer):
        self._infomessage(message='stream created')
        self.currentstreamtask = self.loop.create_task(self._currentstreamhandler(reader, writer))

    async def _currentstreamhandler(self, reader, writer):
        data_counter = 0
        timer = millis()
        while(True):
            line = await reader.readline()
            if line == b'':
                self._infomessage(message='connection interrupted! now exitting', is_error=True)
                return
            elif line == b'STOP\n':
                self._infomessage('server finished')
                self.loop.stop()
                return
            elif line == b'PAUSE\n':
                    writer.write(b'CONTINUE\n')
                    #TODO find a better way to control data flow
            else:
                data_counter += 1
                if (millis() - timer) > (self.interval*1000):
                    timer = millis()
                    self._datamessage(bps_value=(data_counter*self.quic_payload_size*8/self.interval))
                    data_counter = 0

    def listen(self):
        super().listen()
        try:
            self.server_task = self.loop.create_task(
                aioquic.asyncio.serve(host='0.0.0.0',
                port=self.port,
                configuration=self.config,
                stream_handler=self._streamhandler
                ))
            self.loop.run_forever()
        except asc.CancelledError:
            print('cancelled error')

#basically when running a test
#QUIC_client or QUIC_server instance is created
#and then run_test() or listen() is called

4

0 回答 0