0

我以这种方式使用 Prefect 安排任务:

#Python script
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
from datetime import timedelta
from datetime import datetime
from prefect.schedules import IntervalSchedule
import os
import sys

schedule = IntervalSchedule(start_date=datetime.now() + timedelta(seconds=10),interval=timedelta(minutes=1))
can_start = True

with Flow("List files", schedule) as flow:
    
    if can_start:
        can_start = False
        file_names = os.listdir("/home/admin/data/raw")
        file_names = fnmatch.filter(file_names, "*fact*")
        process_common.map(file_names)
        can_start = True
    
out = flow.run()

但是,如果文件在第一次 Prefect 运行后到达我的目录,则 file_names 在第二次运行期间保持为空,并且在所有下一次运行期间也是如此。

我试图用 grep 命令来获取我的文件,然后它就可以工作了!

file_names = ShellTask(command="ls /home/admin/data/raw | grep fact", return_all=True, log_stderr=True, stream_output=True)

有人知道为什么会这样吗?非常感谢您的帮助。

4

1 回答 1

1

这是一个常见的混淆点 - 您将构建时逻辑与运行时逻辑混为一谈(有关另一个示例,请参阅此 SO 帖子)。

您希望在运行时生效的所有逻辑都应封装为 Prefect 任务 - 在您的情况下,您可能需要使用 Prefect 的条件任务来实现您的结果,尽管您可能能够摆脱更简单的事情。

特别是,以下代码似乎具有预期的结果:

@task
def get_filenames():
    file_names = os.listdir("/home/admin/data/raw")
    file_names = fnmatch.filter(file_names, "*fact*")
    return file_names


with Flow("List files", schedule) as flow:
    process_common.map(file_names) # if the list is empty, nothing will happen
    
out = flow.run()

最后,请注意,您可以使用SKIP 信号根据动态运行时条件有效地将任务标记为“已跳过” 。

于 2021-01-27T19:24:33.683 回答