我现在有大量文档要处理,并且正在使用 Python RQ 来并行化任务。
我希望在每个文档上执行不同的操作时完成一系列工作。例如:A
-> B
->C
表示将文档传递给 function A
,A
完成后,继续执行B
和 last C
。
然而,Python RQ 似乎并不能很好地支持管道的东西。
这是一个简单但有点脏的做法。总之,流水线中的每个函数都以嵌套方式调用其下一个函数。
例如,对于管道A
-> B
-> C
。
在顶层,一些代码是这样编写的:
q.enqueue(A, the_doc)
其中 q 是Queue
实例,在函数A
中有如下代码:
q.enqueue(B, the_doc)
在 中B
,有这样的东西:
q.enqueue(C, the_doc)
还有比这更优雅的方法吗?例如ONE函数中的一些代码:
q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)
depends_on参数是最接近我要求的参数,但是,运行类似:
A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job )
不会工作。Asq.enqueue(B, depends_on=A_job )
在执行后立即A_job = q.enqueue(A, the_doc)
执行。当 B 入队时,来自 A 的结果可能还没有准备好,因为它需要时间来处理。
PS:
如果 Python RQ 在这方面不是很擅长,我可以使用 Python 中的其他工具来实现相同的目的:
- 循环并行化
- 管道处理支持