2

我有一个带有女服务员的烧瓶应用程序,它在请求后获取一些数据,然后它运行一些长时间的计算long_function并返回结果。这些计算是并行的,我正在使用pebble,因为我需要一个超时选项。另外我希望用户能够发送重新启动服务器的请求(即他想更改线程数waitress

我找到了这个解决方案https://gist.github.com/naushadzaman/b65534d912f1551c7d8366b326b7a151 它大部分都可以工作,但它与我的pebble游泳池互动不佳。当服务器在池中时,我无法重新加载服务器。如果我使用long_function_without_pool不使用任何多处理,我可以重新加载服务器,即使它当前正在执行某些工作(当然,结果会丢失,但这就是我想要的)。但是long_function我必须等待池关闭,然后才能重新启动服务器。如果我在池仍处于打开状态时尝试发送重新启动请求,则会收到错误消息:

OSError: [Errno 98] Address already in use

所以我想如果有跑步,p.terminate()那是行不通的。Pool

如何修复此代码,或者我应该使用不同的解决方案?

有关如何复制此错误的简要说明:

  1. 启动应用程序

  2. 发送带有空正文的 POST 请求到http://localhost:5221/

  3. 在收到响应之前(您将有 5 秒)发送不带变量的 GET-request 到http://localhost:5221/restart/

  4. 请享用。服务器现在卡住了,没有任何响应

    import subprocess
    from flask import Flask
    from flask_restful import Api, Resource
    from flask_cors import CORS
    from webargs.flaskparser import parser, abort
    import json
    import time
    import sys
    from waitress import serve
    from multiprocessing import Process, Queue
    from concurrent.futures import TimeoutError
    from pebble import ProcessPool, ProcessExpired
    import functools
    
    some_queue = None
    
    
    APP = Flask(__name__)
    API = Api(APP)
    CORS(APP)
    
    @APP.route('/restart/', methods=['GET'], endpoint='start_flaskapp')
    def restart():
        try:
            some_queue.put("something")
            print("Restarted successfully")
            return("Quit")
        except:
            print("Failed in restart")
            return "Failed"
    
    def start_flaskapp(queue):
        global some_queue
        some_queue = queue
        API.add_resource(FractionsResource, "/")
        serve(APP, host='0.0.0.0', port=5221, threads=2)
    
    def long_function():
        with ProcessPool(5) as pool:
            data = [0, 1, 2, 3, 4]
            future = pool.map(functools.partial(add_const, const=1), data, timeout=5)
            iterator = future.result()
            result=[]
            while True:
                try:
                    result.append(next(iterator))
                except StopIteration:
                    break
                except TimeoutError as error:
                    print("function took longer than %d seconds" % error.args[1])
        return(result)
    
    def long_function_without_pool():
        data = [0, 1, 2, 3, 4]
        result = list(map(functools.partial(add_const, const=1), data))
        return(result)
    
    def add_const(number, const=0):
        time.sleep(5)
        return number+const
    
    class FractionsResource(Resource):
        @APP.route('/', methods=['POST'])
        def post():
            response = long_function()
            return(json.dumps(response))
    
    if __name__ == "__main__":
    
        q = Queue()
        p = Process(target=start_flaskapp, args=(q,))
        p.start()
        while True: #wathing queue, if there is no call than sleep, otherwise break
            if q.empty():
                time.sleep(1)
            else:
                break
        p.terminate() #terminate flaskapp and then restart the app on subprocess
        args = [sys.executable] + [sys.argv[0]]
        subprocess.call(args)
    
4

0 回答 0