Luigi 执行方法的顺序是什么(运行、输出、要求)。我知道需要运行作为检查任务 DAG 有效性的第一个检查,但不应该在 run() 之后运行输出?
我实际上是在尝试等待运行中的 kafka 消息,并基于该消息触发一堆其他任务并返回 LocalTarget。像这样:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
但是,我收到一条错误消息:
例外:必须设置路径或 is_tmp
在返回 LocalTarget(self.path)行。为什么 luigi 尝试执行 def output() 方法直到 def run() 完成?