我以这种方式使用 Prefect 安排任务:
#Python script
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
from datetime import timedelta
from datetime import datetime
from prefect.schedules import IntervalSchedule
import os
import sys
schedule = IntervalSchedule(start_date=datetime.now() + timedelta(seconds=10),interval=timedelta(minutes=1))
can_start = True
with Flow("List files", schedule) as flow:
if can_start:
can_start = False
file_names = os.listdir("/home/admin/data/raw")
file_names = fnmatch.filter(file_names, "*fact*")
process_common.map(file_names)
can_start = True
out = flow.run()
但是,如果文件在第一次 Prefect 运行后到达我的目录,则 file_names 在第二次运行期间保持为空,并且在所有下一次运行期间也是如此。
我试图用 grep 命令来获取我的文件,然后它就可以工作了!
file_names = ShellTask(command="ls /home/admin/data/raw | grep fact", return_all=True, log_stderr=True, stream_output=True)
有人知道为什么会这样吗?非常感谢您的帮助。