-1

我编写了一段简单的代码来运行 Luigi 中的任务。代码如下:

import luigi

count = 0

class TaskC(luigi.Task):
    def requires(self):
        return None

    def run(self):
        print("Running task C ...")
        global count
        with self.output().open('w') as outfile:
            outfile.write("Finished task C, count = %d", count)
            count += 1
        
    def output(self):
        return luigi.LocalTarget("./logs/task_c.txt")


class TaskB(luigi.Task):
    def requires(self):
        return None

    def run(self):
        print("Running task B ...")
        global count
        with self.output().open('w') as outfile:
            outfile.write("Finished task B, count = %d ...", count)
            count += 1
        
    def output(self):
        return luigi.LocalTarget("./logs/task_b.txt")


class TaskA(luigi.Task):

    def requires(self):
        return [TaskB(), TaskC()]

    def run(self):
        print("Running task A ...")
        global count
        with self.output().open('w') as outfile:
            outfile.write("Finished task A, count = %d ...", count)
            count += 1

    def output(self):
        return luigi.LocalTarget("./logs/task_a.txt")

if __name__ == '__main__':
    print("Start the fisrt luigi app :)")
    luigi.run()

期望:我想运行TaskA,但TaskA需要TaskB和TaskC - > TaskB和TaskC应该在两个任务B,C完成之前和首先运行,然后TaskA可以运行

实际:仅 TaskA 运行。其他任务没有。登录控制台:

Start the fisrt luigi app :)
DEBUG: Checking if TaskA() is complete
INFO: Informed scheduler that task   TaskA__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=382715991, workers=1, host=w10tng, username=tng, pid=2096) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 TaskA()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

我曾经运行的命令

python first_luigi_app.py --local-scheduler TaskA

我不知道我是否错过了一些东西!如果有人可以提供帮助将不胜感激:)

4

1 回答 1

0

您可以尝试通过返回 None 来从任务 B 和任务 C 中删除所需的方法,因为它们会被跳过。此外,当使用 f-string 格式化时,它工作正常。运行: python -m luigi --module l1 TaskA --local-scheduler 其中 l1 是 l1.py(代码的副本)

于 2021-10-20T14:50:21.307 回答