1

我有一个检查 FTP 服务器上的文件的 dag(气流在单独的服务器上运行)。如果文件存在,则将文件移动到 S3(我们在此处存档)。从那里,文件名被传递给 Spark 提交作业。spark 作业将通过 S3(不同服务器上的 spark 集群)处理文件。我不确定我是否需要有多个 dag,但这是流程。我要做的是仅在 S3 存储桶中存在文件时才运行 Spark 作业。

我尝试使用 S3 传感器,但在满足超时标准后失败/超时,因此整个 dag 设置为失败。

check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done

我只想在一个或多个文件移动到 S3 时执行 FTP 检查的脚本之后运行所有内容。

4

2 回答 2

1

您可以有 2 个不同的 DAG。一个只有 S3 传感器并保持运行,比如说,每 5 分钟一次。如果找到该文件,则触发第二个 DAG。第二个 DAG 将文件提交到 S3 并在完成后归档。您可以在第一个 DAG 中使用 TriggerDagRunOperator 进行触发。

于 2017-06-26T08:50:54.630 回答
0

他给出的答案会奏效。另一种选择是使用传感器具有的“soft_fail”参数(它是来自 BaseSensorOperator 的参数)。如果将此参数设置为 True,则不会使任务失败,而是会跳过它,并且分支中的所有后续任务也将被跳过。

有关详细信息,请参阅气流代码

于 2018-02-22T17:05:37.783 回答