6

我现在有大量文档要处理,并且正在使用 Python RQ 来并行化任务。

我希望在每个文档上执行不同的操作时完成一系列工作。例如:A-> B->C表示将文档传递给 function AA完成后,继续执行B和 last C

然而,Python RQ 似乎并不能很好地支持管道的东西。

这是一个简单但有点脏的做法。总之,流水线中的每个函数都以嵌套方式调用其下一个函数。

例如,对于管道A-> B-> C

在顶层,一些代码是这样编写的:

q.enqueue(A, the_doc)

其中 q 是Queue实例,在函数A中有如下代码:

q.enqueue(B, the_doc)

在 中B,有这样的东西:

q.enqueue(C, the_doc)

还有比这更优雅的方法吗?例如ONE函数中的一些代码:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on参数是最接近我要求的参数,但是,运行类似:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

不会工作。Asq.enqueue(B, depends_on=A_job )在执行后立即A_job = q.enqueue(A, the_doc)执行。当 B 入队时,来自 A 的结果可能还没有准备好,因为它需要时间来处理。

PS:

如果 Python RQ 在这方面不是很擅长,我可以使用 Python 中的其他工具来实现相同的目的:

  1. 循环并行化
  2. 管道处理支持
4

2 回答 2

4

当 B 入队时,来自 A 的结果可能还没有准备好,因为它需要时间来处理。

我不确定当您最初发布问题时这是否真的是真的,但无论如何,现在这不是真的。事实上,该depends_on功能正是为您描述的工作流程而设计的。

确实,这两个函数是立即连续执行的。

A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )

B但是工人在完成之前不会执行A。直到A_job成功执行,B.status == 'deferred'. 一次A.status == 'finished',然后B将开始运行。

这意味着B并且C可以像这样访问和操作它们的依赖项的结果:

import time
from rq import Queue, get_current_job
from redis import StrictRedis

conn = StrictRedis()
q = Queue('high', connection=conn)

def A():
    time.sleep(100)
    return 'result A'

def B():
    time.sleep(100)
    current_job = get_current_job(conn)
    a_job_id = current_job.dependencies[0].id
    a_job_result = q.fetch_job(a_job_id).result
    assert a_job_result == 'result A'
    return a_job_result + ' result B'


def C():
    time.sleep(100)
    current_job = get_current_job(conn)
    b_job_id = current_job.dependencies[0].id
    b_job_result = q.fetch_job(b_job_id).result
    assert b_job_result == 'result A result B'
    return b_job_result + ' result C'

工人最终将打印'result A result B result C'.

此外,如果队列中有许多作业并且B可能在执行之前等待一段时间,您可能希望显着增加result_ttl或使其无限期result_ttl=-1。否则,无论设置多少秒,A 的结果都将被清除,result_ttl在这种情况下,B将不再能够访问它并返回所需的结果。

然而,设置result_ttl=-1具有重要的记忆意义。这意味着您的作业结果将永远不会被自动清除,并且内存将按比例增长,直到您手动从 redis 中删除这些结果。

于 2016-06-09T15:26:29.003 回答
0

depends_on 参数是最接近我要求的参数,但是,运行类似:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

不会工作。因为 q.enqueue(B, depends_on=A_job ) 在 A_job = q.enqueue(A, the_doc) 执行后立即执行。当 B 入队时,来自 A 的结果可能还没有准备好,因为它需要时间来处理。

对于这种情况, q.enqueue(B, depends_on=A_job) 将在 A_job 完成后运行。如果结果没有准备好, q.enqueue(B, depends_on=A_job) 会一直等到它准备好。


它不支持开箱即用,但使用其他技术是可能的。

在我的例子中,我使用缓存来跟踪链中的前一个作业,所以当我们想要将一个新函数加入队列(在之后运行)时,我们可以在调用 enqueue() 时正确设置它的 'depends_on' 参数

请注意使用附加参数来排队:'timeout, result_ttl, ttl'。因为我在 rq 上运行长时间的作业,所以使用了这些。您可以在 python rq 文档中参考它们的使用。

我使用了源自python rq的 django_rq.enqueue()

    # main.py
    def process_job():
        ...

        # Create a cache key for every chain of methods you want to call.
        # NOTE: I used this for web development, in your case you may want
        # to use a variable or a database, not caching

        # Number of time to cache and keep the results in rq
        TWO_HRS = 60 * 60 * 2

        cache_key = 'update-data-key-%s' % obj.id
        previous_job_id = cache.get(cache_key)
        job = django_rq.enqueue(update_metadata,
                                campaign=campaign,
                                list=chosen_list,
                                depends_on=previous_job_id,
                                timeout=TWO_HRS,
                                result_ttl=TWO_HRS,
                                ttl=TWO_HRS)

        # Set the value for the most recent finished job, so the next function
        # in the chain can set the proper value for 'depends_on'
        cache.set(token_key, job.id, TWO_HRS)

    # utils.py
    def update_metadata(campaign, list):
        # Your code goes here to update the campaign object with the list object
        pass

'depends_on' - 来自rq 文档

depends_on - 指定在此作业排队之前必须完成的另一个作业(或作业 ID)

于 2015-11-17T22:22:35.990 回答