1

下面是通用 websocket 流媒体的(工作)代码。

它创建一个守护线程,从中执行asyncio.run(...).

asyncio 代码生成 2 个任务,这些任务永远不会完成。

如何正确销毁该对象?

其中一项任务是执行保活“ping”,因此我可以使用标志轻松退出该循环。但另一个是阻止来自 websocket 的消息。

import json
import aiohttp
import asyncio
import gzip

import asyncio
from threading import Thread

class WebSocket:
    KEEPALIVE_INTERVAL_S = 10

    def __init__(self, url, on_connect, on_msg):
        self.url = url
        self.on_connect = on_connect
        self.on_msg = on_msg

        self.streams = {}
        self.worker_thread = Thread(name='WebSocket', target=self.thread_func, daemon=True).start()

    def thread_func(self):
        asyncio.run(self.aio_run())

    async def aio_run(self):
        async with aiohttp.ClientSession() as session:

            self.ws = await session.ws_connect(self.url)

            await self.on_connect(self)

            async def ping():
                while True:
                    print('KEEPALIVE')
                    await self.ws.ping()
                    await asyncio.sleep(WebSocket.KEEPALIVE_INTERVAL_S)

            async def main_loop():
                async for msg in self.ws:
                    def extract_data(msg):
                        if msg.type == aiohttp.WSMsgType.BINARY:
                            as_bytes = gzip.decompress(msg.data)
                            as_string = as_bytes.decode('utf8')
                            as_json = json.loads(as_string)
                            return as_json

                        elif msg.type == aiohttp.WSMsgType.TEXT:
                            return json.loads(msg.data)

                        elif msg.type == aiohttp.WSMsgType.ERROR:
                            print('⛔️ aiohttp.WSMsgType.ERROR')

                        return msg.data

                    data = extract_data(msg)

                    self.on_msg(data)

            # May want this approach if we want to handle graceful shutdown
            # W.task_ping = asyncio.create_task(ping())
            # W.task_main_loop = asyncio.create_task(main_loop())

            await asyncio.gather(
                ping(),
                main_loop()
            )

    async def send_json(self, J):
        await self.ws.send_json(J)

4

2 回答 2

2

我建议使用 ofasyncio.run_coroutine_threadsafe而不是asyncio.run. 它返回一个concurrent.futures.Future您可以取消的对象:

def thread_func(self):
    self.future = asyncio.run_coroutine_threadsafe(
        self.aio_run(), 
        asyncio.get_event_loop()
    )

# somewhere else
self.future.cancel()

另一种方法是创建一个任务,pingmain_loop在必要时取消它们:

# inside `aio_run`
self.task_ping = asyncio.create_task(ping())
self.main_loop_task = asyncio.create_task(main_loop())

await asyncio.gather(
    self.task_ping,
    self.main_loop_task
    return_exceptions=True
)


# somewhere else
self.task_ping.cancel()
self.main_loop_task.cancel()

这不会改变aio_run也应该用 调用的事实asyncio.run_coroutine_threadsafeasyncio.run应该用作 asyncio 程序的主要入口点,并且应该只调用一次。

于 2021-06-06T20:34:22.033 回答
1

我想提出另一种解决方案的变体。在完成协程(任务)时,我更喜欢尽量减少使用cancel()(但不排除),因为有时它会使调试业务逻辑变得困难(请记住,asyncio.CancelledError它不继承自Exception)。

在您的情况下,代码可能如下所示(仅更改):

class WebSocket:
    KEEPALIVE_INTERVAL_S = 10

    def __init__(self, url, on_connect, on_msg):
        # ...      
        self.worker_thread = Thread(name='WebSocket', target=self.thread_func)
        self.worker_thread.start()

    async def aio_run(self):
        self._loop = asyncio.get_event_loop()
        # ...
 
        self._ping_task = asyncio.create_task(ping())
        self._main_task = asyncio.create_task(main_loop())

        await asyncio.gather(
            self._ping_task,
            self._main_task,
            return_exceptions=True
        )
        # ...

    async def stop_ping(self):
        self._ping_task.cancel()
        try:
            await self._ping_task
        except asyncio.CancelledError:
            pass

    async def _stop(self):
        # wait ping end before socket closing
        await self.stop_ping()
        # lead to correct exit from `async for msg in self.ws`
        await self.ws.close()

    def stop(self):
        # wait stopping ping and closing socket
        asyncio.run_coroutine_threadsafe(
            self._stop(), self._loop
        ).result() 
        self.worker_thread.join()  # wait thread finish
于 2021-06-07T12:35:28.297 回答