0

我正在评估 Airflow 是否适合我的需求(在生物信息学方面)。我在使用 Airflow 模型时遇到了一些困难。具体来说:

  • DAG 文件实际在哪里执行?它的背景是什么?如何将输入数据传递到 DAG 定义文件中?(例如,我想为目录中的每个文件创建一个任务。)
  • 如何临时执行 DAG?如何为 DAG 构造传递参数?

这是我想要执行的示例。假设我刚刚收到一些数据作为目录,其中包含某些共享文件系统中可用的 20 个文件。我想执行一个 DAG 管道,它对 20 个文件中的每一个文件运行一个特定的 bash 命令,然后组合一些结果并执行进一步的处理。DAG 需要文件系统上的路径,还需要列出目录中的文件,以便为每个文件构建一个任务。

XCom只要我可以预先动态构建整个 DAG,我可能不需要将元数据从一个任务传递到另一个任务(我理解这是可能的)。但我不清楚如何通过 DAG 构建路径。

换句话说,我希望我的 DAG 定义包括类似

dag = DAG(...)
for file in glob(input_path):
    t = BashOperator(..., dag=dag)

input_path当我想手动触发 DAG 时如何传入?

我也不需要 cron 样式的调度。

4

1 回答 1

0

关于input_path您可以使用 Airflow 变量将其传递给 DAG。DAG 文件中使用的代码示例:

input_path = Variable.get("INPUT_PATH")

可以使用 Airflow cli 或通过 UI 手动导入变量。

您应该为这种类型的逻辑使用 subdag:

dag = DAG(...) for file in glob(input_path): t = BashOperator(..., dag=dag)

SubDAG 非常适合重复模式。在使用 Airflow 时,定义一个返回 DAG 对象的函数是一种很好的设计模式。

于 2017-08-04T14:54:31.663 回答