0

我是 RQ 的新手,我正在尝试在我的烧瓶应用程序中实现它。我的一条路线的主要目标是更新数据库中的值。要设置我的工人,我正在使用以下内容:

from rq import Worker, Queue, Connection
import redis
import os

@app.before_first_request
def start_worker():
    def runworker():
        redis_url = os.environ.get("REDIS_URL") or 'redis://'
        conn = redis.from_url(redis_url)
        with Connection(conn):
            worker = Worker(list(map(Queue, listen)))
            worker.work()
    tp = ThreadPoolExecutor()
    tp.submit(runworker)


def get_redis_connection():
    redis_connection = getattr(g, '_redis_connection', None)
    if redis_connection is None:
        redis_url = os.environ.get('REDIS_URL') or 'redis://'
        redis_connection = redis.from_url(redis_url)
    return redis_connection


@app.before_request
def push_rq_connection():
    push_connection(get_redis_connection())


@app.teardown_request
def pop_rq_connection(exception=None):
    pop_connection()

然后更新路由将更新作业排队

@app.route('/update')
def update_db():
    q = Queue(connection=conn)
    job = q.q.enqueue('app.tasks.update_task', parameters)
    job_id = job.get_id()
    return {"job_id": job_id}, 201, {"Content-Type": 'application/json'}

最后,worker 运行更新函数

def update_task(parameters):
    # script to update DB

我知道工作人员设置(几乎)有效,因为如果我将 update_task 功能切换为简单的东西,例如:

def update_task(seconds):
    for i in range(seconds):
        print(i)
        time.sleep(1)
    return "Hello world"

有用。但是,对于真正的函数,我一直遇到我的环境变量没有被定义的问题,因为当我运行实际更新时,我得到异常说我的变量是 None 或类似的东西。

有谁知道如何处理 RQ 中的环境变量?我应该在某个配置文件之类的地方再次声明它们吗?

4

1 回答 1

0

好的,所以最好使用multiprocessing

from multiprocessing import Process

def runworker():
    redis_url = os.environ.get("REDIS_URL") or "redis://"
    conn = redis.from_url(redis_url)
    listen = ['default']
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

pp_worker = Process(target=runworker)
pp_worker.start()


@app.teardow_appcontext
def teardown_rq(self):
    pp_worker.terminate()

aaand 就行了。只是如果有人想知道它。

于 2021-03-13T21:25:17.730 回答