问题标签 [apache-airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1808 浏览

rabbitmq - 排队任务以在 RabbitMQ 上运行时,气流调度程序和 Web 服务器挂起

我正在努力让气流工作人员运行任务。我开始服务:

这些过程运行良好,但是当调度程序将要运行的任务添加到队列中或当我尝试从气流 UI 运行任务时,调度程序和网络服务器被挂起 - 持续加载不再继续 - 同时将任务添加到队列: 调度程序挂起 气流用户界面 网络服务器挂起 工作人员没有从 rabbitmq 队列接收任务 花服务器挂了

我认为这个问题与调度程序/网络服务器和队列之间的通信有关。我在airflow.cfg 文件中与代理相关的设置是:broker_url = amqp://guest:***@ksaprice_rabbitmq:15672//- 我也尝试过:broker_url = pyamqp://guest:***@ksaprice_rabbitmq:15672//。rabbitmq 是服务器运行良好,我也测试了登录名和密码凭据。

我使用的版本是:

  • 气流==1.8.1
  • 芹菜=4.1
  • rabbitmq 服务器 3.6

我是 Airflow 和 Rabbitmq 的新手。

更新: @Jean-Sébastien Pédron 的回答解决了我的排队问题,但我的工作人员仍然没有执行任务,并且虽然airflow worker服务在 8793 端口运行,但花没有显示工作人员。

兔MQ报告:

0 投票
1 回答
4307 浏览

rabbitmq - Airflow worker 没有监听默认的rabbitmq 队列

我已经用 rabbitmq 代理配置了 Airflow,服务:

正在运行,没有任何错误。调度程序正在推送要在defaultrabbitmq 队列上执行的任务:

在此处输入图像描述 在此处输入图像描述

即使我尝试过airflow worker -q=default- 工人仍然没有收到要运行的任务。我的airflow.cfg 设置文件:

rabbitmqctl report

更新:我尝试过的模块版本:带有 celery 3.x 的气流 1.8,带有 celery 4.1 和 celery 3.1.25 的气流 1.8.1,没有一个组合解决了这个问题。

0 投票
1 回答
614 浏览

airflow - 部署如何与 Airflow 配合使用?

我正在使用 Celery Executor 和来自这个dockerfile的设置。

我正在将我的 dag 部署/usr/local/airflow/dags到调度程序容器的目录中。

我可以使用以下命令运行我的 dag:

我的 dag 包含一个简单的 bash 运算符:

操作员运行test.sh脚本。

但是,如果test.sh引用其他文件,例如callme.sh,那么我会收到“找不到文件”错误。

运行 myworkflow 时,调用 test.sh 的任务被调用,但因找不到 callme.sh 而失败。

我觉得这很混乱。与工人共享代码资源文件是我的责任还是气流的责任?如果是我的,那么推荐的方法是什么?我正在考虑使用 EFS 并将其安装在所有容器上,但对我来说它看起来非常昂贵。

0 投票
1 回答
18488 浏览

python - 如何使用 TriggerDagRunOperator 触发 Airflow -dag

我找到了以下链接:

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

这确实解释了如何使用TriggerDagRunOperator来执行单独的 Airflow dag。该文档使用 Airflow 自己的示例 dag,但我很难理解它们,因为它们没有使用任何传感器。

有人可以解释我如何使用TriggerDagRunOperatorand开始单独的 dagSqlSensor吗?当我的 SQL Server 作业任务完成时,我正在尝试启动单独的 DAG。我知道如何使用 来检查 SQL Server 作业的状态SqlSensor,但我不知道如何将结果附加到TriggerDagRunOperator以启动单独的 DAG。

我不想使用 Airflow CLI 或在一个 DAG 中完成这两项任务。基本上,我希望这只是触发 dag。

以下是我当前的代码,缺少关键的conditionally_trigger

0 投票
2 回答
2067 浏览

python - 气流定制 jinja2 过滤器

我正在尝试为我的气流 jinja2 模板添加自定义过滤器。

因为我在 S3 中的文件夹就像

/年月日/

,我的目的是在变量屏幕中使用昨天_ds,如下所示:

s3://logs.web.com/AWSLogs/{{昨天_ds | get_year }}/{{昨天_ds | get_month }}/{{昨天_ds | get_day }}/

我在 PR(我认为它已经合并..)中看到,您可以在此处创建 dag 对象的 dag_args 参数中使用参数 'user_defined_filters' 来执行此操作

问题是,即使这样做,它也会显示“没有名为 get_year 的过滤器”。

这是我的代码:

dag.py

实用程序.py

有人看到我错在哪里了吗?谢谢!

编辑

在 dag 定义之后打印,没有显示自定义过滤器,不幸的是:(。

此外,如果我尝试将其直接添加为 DAG 对象参数,如测试@tests/models.py 中所示:

编辑 2

好的,我看到的是我有 1.8.0 版本,而这个没有过滤器。有人知道如何通过 pip 下载 1.8.2rc 吗?或者我们不能?

0 投票
2 回答
3304 浏览

python-2.7 - AssertionError:内部:未指定默认项目

气流新手。尝试运行 sql 并将结果存储在 BigQuery 表中。

出现以下错误。不确定在哪里设置 default_rpoject_id。

请帮我。

错误:

代码:

0 投票
2 回答
8945 浏览

python - Airflow:如何从 PostgreOperator 推送 xcom 价值?

我正在使用 Airflow 1.8.1,我想从 PostgreOperator 推送 sql 请求的结果。

这是我的任务:

这是我的 sql 脚本:

当我从中检查 xcom 值时,check_task它会检索none值。

0 投票
3 回答
1373 浏览

airflow - 气流返回“回填完成”而不运行任务

我正在运行 Airflow 并尝试迭代我们从命令行构建的某些任务。

运行气流网络服务器时,一切都按预期工作。但是当我跑步时airflow backfill dag task '2017-08-12',气流返回:

...并且实际上并没有运行dag。

当使用airflow testor airflow run(即涉及运行任务而不是 dag 的命令)时,它按预期工作

我犯了一个基本的错误吗?我该怎么做才能从这里调试?

谢谢

0 投票
2 回答
9312 浏览

python-2.7 - airflow.exceptions.AirflowException:在 DAG 中检测到循环。错误的任务

我正在运行气流管道,但代码看起来不错,但实际上我得到了气流。异常。气流异常:在 DAG 中检测到循环。错误的任务:你能帮忙解决这个问题吗

0 投票
1 回答
1914 浏览

airflow - 气流:动态 SubDag 创建

我有一个用例,其中有一个客户列表。客户端可以从列表中添加或删除,它们可以有不同的开始日期和不同的初始参数。

我想使用气流根据每个客户端的初始开始日期回填所有数据+如果出现故障则重新运行。我正在考虑为每个客户创建一个 SubDag。这会解决我的问题吗?

如何根据 client_id 动态创建 SubDag?