问题标签 [apache-airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
airflow - 创建唯一文件名并在所有气流任务中访问该文件
每次气流 dag 运行并从所有任务访问该文件时,我们可以创建唯一的文件名吗?我尝试创建全局变量(output_filename)并为其附加时间戳。但是,当我在任务中访问该文件名时,每个任务都会生成不同的文件名,因为它正在计算每个任务中的时间戳。下面是示例代码:
我们有更多任务需要访问 output_filename。我们如何在所有任务中访问 output_filename 全局变量?
docker - 气流:无法将工作日志发送到 S3
我使用 Docker 镜像在我的 kubernetes 集群上部署了 Airflow 网络服务器、调度程序、worker 和 Flower。气流版本是 1.8.0。
现在我想将工作日志发送到 S3 和
- 从 Admin UI 创建 Airflow 的 S3 连接(只需设置
S3_CONN
为 conn id,s3
作为类型。因为我的 kubernetes 集群在 AWS 上运行并且所有节点都有 S3 访问角色,应该足够了) - 如下设置气流配置
remote_base_log_folder = s3://aws-logs-xxxxxxxx-us-east-1/k8s-airflow
remote_log_conn_id = S3_CONN
encrypt_s3_logs = False
首先我尝试创建一个 DAG,以便它在运行后立即引发异常。这行得通,可以在 S3 上看到日志。
所以我进行了修改,以便 DAG 现在创建一个 EMR 集群并等待它准备好(等待状态)。为此,我重新启动了所有 4 个 docker 容器的气流。
现在 DAG 看起来正在运行,集群已启动,一旦准备就绪,DAG 将标记为成功。但我在 S3 上看不到任何日志。
工作人员和 Web 服务器上没有相关的错误日志,所以我什至看不到可能导致此问题的原因。只是没有发送日志。
除了官方文档中的描述外,有谁知道 Airflow 的远程记录是否有一些限制? https://airflow.incubator.apache.org/configuration.html#logs
在 Airflow Web UI 中,本地日志优先于远程日志。如果无法找到或访问本地日志,则会显示远程日志。请注意,仅在任务完成(包括失败)后才会将日志发送到远程存储。换句话说,运行任务的远程日志不可用。
我没想到,但成功后,日志不会发送到远程存储吗?
workflow - 如何在特定目录上手动运行 Airflow DAG
我正在评估 Airflow 是否适合我的需求(在生物信息学方面)。我在使用 Airflow 模型时遇到了一些困难。具体来说:
- DAG 文件实际在哪里执行?它的背景是什么?如何将输入数据传递到 DAG 定义文件中?(例如,我想为目录中的每个文件创建一个任务。)
- 如何临时执行 DAG?如何为 DAG 构造传递参数?
这是我想要执行的示例。假设我刚刚收到一些数据作为目录,其中包含某些共享文件系统中可用的 20 个文件。我想执行一个 DAG 管道,它对 20 个文件中的每一个文件运行一个特定的 bash 命令,然后组合一些结果并执行进一步的处理。DAG 需要文件系统上的路径,还需要列出目录中的文件,以便为每个文件构建一个任务。
XCom
只要我可以预先动态构建整个 DAG,我可能不需要将元数据从一个任务传递到另一个任务(我理解这是可能的)。但我不清楚如何通过 DAG 构建路径。
换句话说,我希望我的 DAG 定义包括类似
input_path
当我想手动触发 DAG 时如何传入?
我也不需要 cron 样式的调度。
cron - 不同日期不同时间的 Cron 选项卡调度
我需要在周一至周四晚上 7 点安排工作,周五我需要在晚上 11 点安排工作。我正在使用 Airflow 并且需要 cron 制表符,例如
0 19 * * 周一至周四
欢迎任何建议。
谢谢
问候, CJ
airflow - 每月日期和时间的气流 DAG 调度
我们一直在将我们的 cron 作业转换为 Airflow DAG,我很难弄清楚 DAG 的调度在 Airflow 中是如何工作的。一些 DAG 需要在一天中的特定时间(即早上 7 点)运行,而其他 DAG 需要在每月的特定日期/时间(即每月 15 日早上 6 点)运行。
一般来说,Airflow 似乎每天都在正确运行 DAG。因此,schedule_interval = '0 7 * * *
每天'start_date': datetime(2017,4,7)
早上 7 点运行。
但是,对于每月 DAG (schedule_interval = '0 6 15 * *'
和'start_date': datetime(2017,4,7)
),它在 4 月 15 日早上 6 点运行,但从那时起就没有运行过。我尝试每月安排的其他 DAG 在第一个月后同样无法运行。
Airflow关于调度的文档是,IMO,浑浊,对其他SO问题的回答让我更加困惑。我希望有人能澄清我的理解和我试图每月安排的 DAG 出了什么问题。
airflow - 气流 - 是否可以使用 backfill 命令一次(按顺序)运行一天?
基本上,我想运行 backfill 命令整整一个月。但是有一些任务依赖于前一天的数据。据我所知,这个命令每天都在同一时间运行。
有没有办法让回填命令一次运行一天(按顺序)?
干杯。
airflow - 气流以编程方式取消暂停dag?
我有一个 dag,我们将部署到多个不同的气流实例,并且在 airflow.cfg 中我们有dags_are_paused_at_creation = True
,但是对于这个特定的 dag,我们希望无需通过单击 UI 手动打开它。有没有办法以编程方式做到这一点?
airflow - 来自cli的气流传递参数
有没有办法将参数传递给:
?
我有一个监视文件目录的脚本 - 当文件移动到目标目录时,我想触发作为参数传递文件路径的 dag。
python - 气流:将动态值传递给子 DAG 运算符
我是气流新手。
我遇到了一个场景,其中父 DAG 需要将一些动态数字(比如说n
)传递给子 DAG。
SubDAG 将使用此数字来动态创建n
并行任务。
气流文档没有涵盖实现这一目标的方法。所以我探索了几种方法:
选项 - 1(使用 xcom 拉取)
我试图作为 xcom 值传递,但由于某种原因,SubDAG 没有解析为传递的值。
父 Dag 文件
子日期文件
选项 - 2
我也尝试将number_of_runs
其作为全局变量传递,但它不起作用。
选项 - 3
我们还尝试将此值写入数据文件。但子 DAG 正在抛出File doesn't exist error
。这可能是因为我们正在动态生成这个文件。
有人可以帮我弄这个吗。
airflow - Dags和任务的气流结构/组织
我的问题:
- 为了组织你的 dags 和任务,什么是好的目录结构?(dags 示例仅显示几个任务)
- 我目前将我的 dags 放在 dags 文件夹的根目录中,并将我的任务放在单独的目录中,不知道该怎么做?
- 我们应该使用 zip 文件吗?https://github.com/apache/incubator-airflow/blob/a1f4227bee1a70531cfa90769149322513cb6f92/airflow/models.py#L280