我有一个长期运行的后台任务,它再次旋转烧瓶应用程序并在后台进行一些审核。前端是一个 Web 应用程序,使用 socketio 与后端主flask 应用程序通信以处理多个异步行为。
我确保仅在创建主线程时才触发后台任务,并且eventlet.monkey_patch()
仅在脚本的最开头执行。
如果后台线程有很多东西要审核,它会阻塞主线程,内存中的东西越多,它阻塞主线程的时间就越长。审计根本不是 CPU 密集型的,它只是一些数据库插入和日志记录。
- 需要审计的项目从主线程添加到内存中的一个对象中,并通过引用传递给子线程。(就像一个内存队列)
- 如果我不猴子补丁 eventlet,那么一切正常,但烧瓶的自动重新加载将不起作用,我需要它进行开发。
- 我在 dev 中运行像 socketio.run(app) 这样的应用程序
- 使用 gunicorn/eventlet 时行为仍然存在
- 当后台任务正在休眠时
sleep(2)
,不会发生阻塞。
import eventlet
eventlet.monkey_patch()
# ... rest of code is a basic flask app create_app factory that at some # point starts the new thread if it's the main thread
# the async code that runs is the following
class AsyncAuditor(threading.Thread):
def __init__(self, tasks: list, stop: threading.Event):
super().__init__()
self.tasks = tasks
self.stop_event = stop
def run(self):
from app import init_app
from dal import db
app = init_app(mode='sys')
app.logger.info('starting async audit thread')
with app.app_context():
try:
while not self.stop_event.is_set():
if len(self.tasks) > 0:
task: dict
for task in self.tasks:
app.logger.debug(threading.current_thread().name + ' new audit record')
task.payload = encryptor.encrypt(task.payload)
task.ip = struct.unpack("!I", socket.inet_aton(task.ip))[0]
db.session.add(task)
self.tasks.clear()
db.session.commit()
sleep(2)
app.logger.info('exiting async audit thread')
except BaseException as e:
app.logger.exception('Exception')
# there's some code that tries to gracefully exit if app needs to exit
stop_event = threading.Event()
async_task = AsyncAuditor(API.audit_tasks, stop_event)
async_task.start()
def exit_async_thread():
stop_event.set()
async_task.join()
atexit.register(exit_async_thread)
我希望在子线程工作时,主线程不会被任何数据库操作阻塞,事实上,就像我之前提到的,如果我不猴子补丁 eventlet,那么在主线程和子线程中一切正常一个也是。相反,当后台任务正在运行时,我在烧瓶应用程序中遇到一个端点时会出现 9 甚至 30 秒的延迟。