1

我有一个长期运行的后台任务,它再次旋转烧瓶应用程序并在后台进行一些审核。前端是一个 Web 应用程序,使用 socketio 与后端主flask 应用程序通信以处理多个异步行为。

我确保仅在创建主线程时才触发后台任务,并且eventlet.monkey_patch()仅在脚本的最开头执行。

如果后台线程有很多东西要审核,它会阻塞主线程,内存中的东西越多,它阻塞主线程的时间就越长。审计根本不是 CPU 密集型的,它只是一些数据库插入和日志记录。

  • 需要审计的项目从主线程添加到内存中的一个对象中,并通过引用传递给子线程。(就像一个内存队列)
  • 如果我不猴子补丁 eventlet,那么一切正常,但烧瓶的自动重新加载将不起作用,我需要它进行开发。
  • 我在 dev 中运行像 socketio.run(app) 这样的应用程序
  • 使用 gunicorn/eventlet 时行为仍然存在
  • 当后台任务正在休眠时sleep(2),不会发生阻塞。

import eventlet
eventlet.monkey_patch()

# ... rest of code is a basic flask app create_app factory that at some # point starts the new thread if it's the main thread

# the async code that runs is the following

class AsyncAuditor(threading.Thread):

    def __init__(self, tasks: list, stop: threading.Event):
        super().__init__()
        self.tasks = tasks
        self.stop_event = stop

    def run(self):
        from app import init_app
        from dal import db

        app = init_app(mode='sys')
        app.logger.info('starting async audit thread')

        with app.app_context():
            try:
                while not self.stop_event.is_set():
                    if len(self.tasks) > 0:
                        task: dict
                        for task in self.tasks:
                            app.logger.debug(threading.current_thread().name + ' new audit record')
                            task.payload = encryptor.encrypt(task.payload)
                            task.ip = struct.unpack("!I", socket.inet_aton(task.ip))[0]
                            db.session.add(task)
                        self.tasks.clear()
                        db.session.commit()
                    sleep(2)
                app.logger.info('exiting async audit thread')
            except BaseException as e:
                app.logger.exception('Exception')

# there's some code that tries to gracefully exit if app needs to exit

stop_event = threading.Event()
async_task = AsyncAuditor(API.audit_tasks, stop_event)
async_task.start()

def exit_async_thread():
    stop_event.set()
    async_task.join()


atexit.register(exit_async_thread)


我希望在子线程工作时,主线程不会被任何数据库操作阻塞,事实上,就像我之前提到的,如果我不猴子补丁 eventlet,那么在主线程和子线程中一切正常一个也是。相反,当后台任务正在运行时,我在烧瓶应用程序中遇到一个端点时会出现 9 甚至 30 秒的延迟。

4

0 回答 0