2

Luigi 执行方法的顺序是什么(运行、输出、要求)。我知道需要运行作为检查任务 DAG 有效性的第一个检查,但不应该在 run() 之后运行输出?

我实际上是在尝试等待运行中的 kafka 消息,并基于该消息触发一堆其他任务并返回 LocalTarget。像这样:

def run(self):
    for message in self.consumer:
        self.metadata_key = str(message.value, 'utf-8')
        self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
        if not os.path.exists(self.path):
            os.mkdir(self.path)

        with self.conn.cursor() as cursor:
              all_accounts = cursor.execute('select domainname from tblaccountinfo;')
        for each in all_accounts:
            open(os.path.join(self.path,each)).close()

def output(self):
    return LocalTarget(self.path)

但是,我收到一条错误消息:

例外:必须设置路径或 is_tmp

返回 LocalTarget(self.path)行。为什么 luigi 尝试执行 def output() 方法直到 def run() 完成?

4

2 回答 2

2

当您运行管道(即一个或多个任务)时,Luigi 首先检查其输出目标是否已经存在,如果不存在,则安排任务运行。

Luigi 如何知道它必须检查哪些目标?它只是让他们调用你的任务的output()方法。

于 2016-12-29T18:21:04.487 回答
0

这不是执行顺序。Luigi 将在将任务设置为挂起状态之前使用 output() 方法检查我们要创建的文件是否存在。因此,如果您使用任何变量,它希望这些变量得到解决。在这里,您使用的是在 run 方法中创建的 self.path。这就是错误的原因。

您必须在类本身中创建路径并在输出方法中使用,或者在输出方法本身中创建它们并在运行方法中使用它们,如下所示

self.output().open('w').close()
于 2022-02-22T10:44:30.687 回答