我编写了一段简单的代码来运行 Luigi 中的任务。代码如下:
import luigi
count = 0
class TaskC(luigi.Task):
def requires(self):
return None
def run(self):
print("Running task C ...")
global count
with self.output().open('w') as outfile:
outfile.write("Finished task C, count = %d", count)
count += 1
def output(self):
return luigi.LocalTarget("./logs/task_c.txt")
class TaskB(luigi.Task):
def requires(self):
return None
def run(self):
print("Running task B ...")
global count
with self.output().open('w') as outfile:
outfile.write("Finished task B, count = %d ...", count)
count += 1
def output(self):
return luigi.LocalTarget("./logs/task_b.txt")
class TaskA(luigi.Task):
def requires(self):
return [TaskB(), TaskC()]
def run(self):
print("Running task A ...")
global count
with self.output().open('w') as outfile:
outfile.write("Finished task A, count = %d ...", count)
count += 1
def output(self):
return luigi.LocalTarget("./logs/task_a.txt")
if __name__ == '__main__':
print("Start the fisrt luigi app :)")
luigi.run()
期望:我想运行TaskA,但TaskA需要TaskB和TaskC - > TaskB和TaskC应该在两个任务B,C完成之前和首先运行,然后TaskA可以运行
实际:仅 TaskA 运行。其他任务没有。登录控制台:
Start the fisrt luigi app :)
DEBUG: Checking if TaskA() is complete
INFO: Informed scheduler that task TaskA__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=382715991, workers=1, host=w10tng, username=tng, pid=2096) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 complete ones were encountered:
- 1 TaskA()
Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
我曾经运行的命令:
python first_luigi_app.py --local-scheduler TaskA
我不知道我是否错过了一些东西!如果有人可以提供帮助将不胜感激:)