我正在评估 Airflow 是否适合我的需求(在生物信息学方面)。我在使用 Airflow 模型时遇到了一些困难。具体来说:
- DAG 文件实际在哪里执行?它的背景是什么?如何将输入数据传递到 DAG 定义文件中?(例如,我想为目录中的每个文件创建一个任务。)
- 如何临时执行 DAG?如何为 DAG 构造传递参数?
这是我想要执行的示例。假设我刚刚收到一些数据作为目录,其中包含某些共享文件系统中可用的 20 个文件。我想执行一个 DAG 管道,它对 20 个文件中的每一个文件运行一个特定的 bash 命令,然后组合一些结果并执行进一步的处理。DAG 需要文件系统上的路径,还需要列出目录中的文件,以便为每个文件构建一个任务。
XCom
只要我可以预先动态构建整个 DAG,我可能不需要将元数据从一个任务传递到另一个任务(我理解这是可能的)。但我不清楚如何通过 DAG 构建路径。
换句话说,我希望我的 DAG 定义包括类似
dag = DAG(...)
for file in glob(input_path):
t = BashOperator(..., dag=dag)
input_path
当我想手动触发 DAG 时如何传入?
我也不需要 cron 样式的调度。