我正在开发 UI 应用程序(kivy)并尝试实现 python AIOHTTP Server 启动和停止按钮。我在另一篇文章中找到了如何在线程中启动 aiohttp 服务器的示例(UI 应用程序使用主线程)。但不确定如何停止在线程中运行的 aiohttp 服务器,因为 loop.run_forever 是一种阻塞方法。
提前致谢。
Python 3.7.0 aiohttp 3.6.2 操作系统:Windows
代码是:
服务器线程.py
import asyncio
from aiohttp import web
def aiohttp_server():
print("aiohttp_server runner created")
async def say_hello(request):
return web.Response(text='Hello, world')
app = web.Application()
app.add_routes([web.get('/', say_hello)])
runner = web.AppRunner(app)
return runner
def run_server(runner):
print("Entering run_server")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, 'localhost', 8080)
loop.run_until_complete(site.start())
print("Loop run_forever")
loop.run_forever()
def stop_server(runner):
print("Entering stop_server")
应用程序,my-ui.py
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.properties import ObjectProperty, NumericProperty
from serverthread import run_server, aiohttp_server
import threading
class ServerAdmin(Widget):
active_threads = NumericProperty(0)
t = ObjectProperty(threading.Thread(target=run_server, args=(aiohttp_server(),), daemon=True))
def start_server(self):
if self.active_threads == 0:
print("starting server...")
self.t.start()
self.active_threads += 1
print("Server thread started.")
else:
print("Number of active threads: ", str(self.active_threads))
def stop_server(self):
print("stopping server...")
print(f"Check if thread active: {str(self.t.isAlive())}, {self.t.name}")
pass
class MyUiApp(App):
def build(self):
return ServerAdmin()
if __name__ == '__main__':
MyUiApp().run()