我有一个检查 FTP 服务器上的文件的 dag(气流在单独的服务器上运行)。如果文件存在,则将文件移动到 S3(我们在此处存档)。从那里,文件名被传递给 Spark 提交作业。spark 作业将通过 S3(不同服务器上的 spark 集群)处理文件。我不确定我是否需要有多个 dag,但这是流程。我要做的是仅在 S3 存储桶中存在文件时才运行 Spark 作业。
我尝试使用 S3 传感器,但在满足超时标准后失败/超时,因此整个 dag 设置为失败。
check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done
我只想在一个或多个文件移动到 S3 时执行 FTP 检查的脚本之后运行所有内容。