28

Flask 中有没有办法将响应发送给客户端,然后继续进行一些处理?我有一些簿记任务要完成,但我不想让客户等待。

请注意,这些实际上是我想做的非常快的事情,因此在这里创建一个新线程或使用队列并不合适。(这些快速的事情之一实际上是在作业队列中添加一些东西。)

4

6 回答 6

10

快速简便的方法。

我们将使用 pythons线程库来实现这一点。

您的 API 使用者已经发送了一些要处理的内容,这些内容由my_task()函数处理,该函数需要10 秒才能执行。但是 API 的使用者希望在他们访问您的 API(即return_status()函数)后立即得到响应。

您将my_task绑定到一个线程,然后将快速响应返回给 API 使用者,而在后台,大进程将完成。

下面是一个简单的 POC。

import os
from flask import Flask,jsonify
import time
from threading import Thread

app = Flask(__name__)

@app.route("/")
def main():
    return "Welcome!"

@app.route('/add_')
def return_status():
    """Return first the response and tie the my_task to a thread"""
    Thread(target = my_task).start()
    return jsonify('Response asynchronosly')

def my_task():
    """Big function doing some job here I just put pandas dataframe to csv conversion"""
    time.sleep(10)
    import pandas as pd
    pd.DataFrame(['sameple data']).to_csv('./success.csv')
    return print('large function completed')

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)
于 2020-09-22T12:32:11.223 回答
10

遗憾的是,在响应返回给客户端后,拆解回调不会执行:

import flask
import time
app = flask.Flask("after_response")

@app.teardown_request
def teardown(request):
    time.sleep(2)
    print("teardown_request")

@app.route("/")
def home():
    return "Success!\n"

if __name__ == "__main__":
    app.run()

卷曲时,您会注意到响应显示之前有 2 秒的延迟,而不是卷曲立即结束,然后是 2 秒后的日志。日志进一步证实了这一点:

teardown_request
127.0.0.1 - - [25/Jun/2018 15:41:51] "GET / HTTP/1.1" 200 -

返回响应后执行的正确方法是使用 WSGI 中间件,该中间件在响应迭代器的 close 方法中添加了一个钩子。这不像teardown_request装饰器那么简单,但它仍然非常简单:

import traceback
from werkzeug.wsgi import ClosingIterator

class AfterResponse:
    def __init__(self, app=None):
        self.callbacks = []
        if app:
            self.init_app(app)

    def __call__(self, callback):
        self.callbacks.append(callback)
        return callback

    def init_app(self, app):
        # install extension
        app.after_response = self

        # install middleware
        app.wsgi_app = AfterResponseMiddleware(app.wsgi_app, self)

    def flush(self):
        for fn in self.callbacks:
            try:
                fn()
            except Exception:
                traceback.print_exc()

class AfterResponseMiddleware:
    def __init__(self, application, after_response_ext):
        self.application = application
        self.after_response_ext = after_response_ext

    def __call__(self, environ, start_response):
        iterator = self.application(environ, start_response)
        try:
            return ClosingIterator(iterator, [self.after_response_ext.flush])
        except Exception:
            traceback.print_exc()
            return iterator

然后您可以像这样使用它:

@app.after_response
def after():
    time.sleep(2)
    print("after_response")

在 shell 中,您将立即看到响应返回,然后 2 秒后将after_response显示日志:

127.0.0.1 - - [25/Jun/2018 15:41:51] "GET / HTTP/1.1" 200 -
after_response

这是此处提供的先前答案的摘要。

于 2018-06-25T15:52:03.777 回答
4

我的博客也有类似的问题。我想在发布新评论时向订阅评论的人发送通知电子邮件,但我不希望发布评论的人在收到回复之前等待所有电子邮件发送完毕。

multiprocessing.Pool为此使用了一个。我启动了一个由一个工作人员组成的池(足够了,流量低的站点),然后每次我需要发送电子邮件时,我都会在 Flask 视图函数中准备好所有内容,但send_email通过apply_async.

于 2013-06-25T22:47:25.973 回答
2

您可以在此处找到有关如何在 Flask 中使用 celery 的示例https://gist.github.com/jzempel/3201722

这个想法的要点(双关语)是将长期的簿记任务定义为@celery.task,并使用apply_async 1或延迟从视图中启动任务

于 2013-06-25T11:13:54.110 回答
2

听起来像Teardown Callbacks会支持你想要的。您可能希望将其与Per-Request After-Request Callbacks中的模式结合起来,以帮助组织代码。

于 2013-06-26T06:34:00.830 回答
0

您可以使用从 Werkzeug 响应对象的装饰器close中公开的 WSGI 协议来执行此操作。call_on_close在此处的其他答案中进行了解释:https ://stackoverflow.com/a/63080968/78903

于 2020-08-07T12:56:33.360 回答