我有一个带有女服务员的烧瓶应用程序,它在请求后获取一些数据,然后它运行一些长时间的计算long_function
并返回结果。这些计算是并行的,我正在使用pebble
,因为我需要一个超时选项。另外我希望用户能够发送重新启动服务器的请求(即他想更改线程数waitress
)
我找到了这个解决方案https://gist.github.com/naushadzaman/b65534d912f1551c7d8366b326b7a151
它大部分都可以工作,但它与我的pebble
游泳池互动不佳。当服务器在池中时,我无法重新加载服务器。如果我使用long_function_without_pool
不使用任何多处理,我可以重新加载服务器,即使它当前正在执行某些工作(当然,结果会丢失,但这就是我想要的)。但是long_function
我必须等待池关闭,然后才能重新启动服务器。如果我在池仍处于打开状态时尝试发送重新启动请求,则会收到错误消息:
OSError: [Errno 98] Address already in use
所以我想如果有跑步,p.terminate()
那是行不通的。Pool
如何修复此代码,或者我应该使用不同的解决方案?
有关如何复制此错误的简要说明:
启动应用程序
发送带有空正文的 POST 请求到http://localhost:5221/
在收到响应之前(您将有 5 秒)发送不带变量的 GET-request 到http://localhost:5221/restart/
请享用。服务器现在卡住了,没有任何响应
import subprocess from flask import Flask from flask_restful import Api, Resource from flask_cors import CORS from webargs.flaskparser import parser, abort import json import time import sys from waitress import serve from multiprocessing import Process, Queue from concurrent.futures import TimeoutError from pebble import ProcessPool, ProcessExpired import functools some_queue = None APP = Flask(__name__) API = Api(APP) CORS(APP) @APP.route('/restart/', methods=['GET'], endpoint='start_flaskapp') def restart(): try: some_queue.put("something") print("Restarted successfully") return("Quit") except: print("Failed in restart") return "Failed" def start_flaskapp(queue): global some_queue some_queue = queue API.add_resource(FractionsResource, "/") serve(APP, host='0.0.0.0', port=5221, threads=2) def long_function(): with ProcessPool(5) as pool: data = [0, 1, 2, 3, 4] future = pool.map(functools.partial(add_const, const=1), data, timeout=5) iterator = future.result() result=[] while True: try: result.append(next(iterator)) except StopIteration: break except TimeoutError as error: print("function took longer than %d seconds" % error.args[1]) return(result) def long_function_without_pool(): data = [0, 1, 2, 3, 4] result = list(map(functools.partial(add_const, const=1), data)) return(result) def add_const(number, const=0): time.sleep(5) return number+const class FractionsResource(Resource): @APP.route('/', methods=['POST']) def post(): response = long_function() return(json.dumps(response)) if __name__ == "__main__": q = Queue() p = Process(target=start_flaskapp, args=(q,)) p.start() while True: #wathing queue, if there is no call than sleep, otherwise break if q.empty(): time.sleep(1) else: break p.terminate() #terminate flaskapp and then restart the app on subprocess args = [sys.executable] + [sys.argv[0]] subprocess.call(args)