问题标签 [apache-airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
rabbitmq - 排队任务以在 RabbitMQ 上运行时,气流调度程序和 Web 服务器挂起
我正在努力让气流工作人员运行任务。我开始服务:
这些过程运行良好,但是当调度程序将要运行的任务添加到队列中或当我尝试从气流 UI 运行任务时,调度程序和网络服务器被挂起 - 持续加载不再继续 - 同时将任务添加到队列:
我认为这个问题与调度程序/网络服务器和队列之间的通信有关。我在airflow.cfg 文件中与代理相关的设置是:broker_url = amqp://guest:***@ksaprice_rabbitmq:15672//
- 我也尝试过:broker_url = pyamqp://guest:***@ksaprice_rabbitmq:15672//
。rabbitmq 是服务器运行良好,我也测试了登录名和密码凭据。
我使用的版本是:
- 气流==1.8.1
- 芹菜=4.1
- rabbitmq 服务器 3.6
我是 Airflow 和 Rabbitmq 的新手。
更新:
@Jean-Sébastien Pédron 的回答解决了我的排队问题,但我的工作人员仍然没有执行任务,并且虽然airflow worker
服务在 8793 端口运行,但花没有显示工作人员。
兔MQ报告:
airflow - 部署如何与 Airflow 配合使用?
我正在使用 Celery Executor 和来自这个dockerfile的设置。
我正在将我的 dag 部署/usr/local/airflow/dags
到调度程序容器的目录中。
我可以使用以下命令运行我的 dag:
我的 dag 包含一个简单的 bash 运算符:
操作员运行test.sh
脚本。
但是,如果test.sh
引用其他文件,例如callme.sh
,那么我会收到“找不到文件”错误。
运行 myworkflow 时,调用 test.sh 的任务被调用,但因找不到 callme.sh 而失败。
我觉得这很混乱。与工人共享代码资源文件是我的责任还是气流的责任?如果是我的,那么推荐的方法是什么?我正在考虑使用 EFS 并将其安装在所有容器上,但对我来说它看起来非常昂贵。
python - 如何使用 TriggerDagRunOperator 触发 Airflow -dag
我找到了以下链接:
https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
这确实解释了如何使用TriggerDagRunOperator
来执行单独的 Airflow dag。该文档使用 Airflow 自己的示例 dag,但我很难理解它们,因为它们没有使用任何传感器。
有人可以解释我如何使用TriggerDagRunOperator
and开始单独的 dagSqlSensor
吗?当我的 SQL Server 作业任务完成时,我正在尝试启动单独的 DAG。我知道如何使用 来检查 SQL Server 作业的状态SqlSensor
,但我不知道如何将结果附加到TriggerDagRunOperator
以启动单独的 DAG。
我不想使用 Airflow CLI 或在一个 DAG 中完成这两项任务。基本上,我希望这只是触发 dag。
以下是我当前的代码,缺少关键的conditionally_trigger
python - 气流定制 jinja2 过滤器
我正在尝试为我的气流 jinja2 模板添加自定义过滤器。
因为我在 S3 中的文件夹就像
/年月日/
,我的目的是在变量屏幕中使用昨天_ds,如下所示:
s3://logs.web.com/AWSLogs/{{昨天_ds | get_year }}/{{昨天_ds | get_month }}/{{昨天_ds | get_day }}/
我在 PR(我认为它已经合并..)中看到,您可以在此处创建 dag 对象的 dag_args 参数中使用参数 'user_defined_filters' 来执行此操作
问题是,即使这样做,它也会显示“没有名为 get_year 的过滤器”。
这是我的代码:
dag.py
实用程序.py
有人看到我错在哪里了吗?谢谢!
编辑
在 dag 定义之后打印,没有显示自定义过滤器,不幸的是:(。
此外,如果我尝试将其直接添加为 DAG 对象参数,如测试@tests/models.py 中所示:
编辑 2
好的,我看到的是我有 1.8.0 版本,而这个没有过滤器。有人知道如何通过 pip 下载 1.8.2rc 吗?或者我们不能?
python-2.7 - AssertionError:内部:未指定默认项目
气流新手。尝试运行 sql 并将结果存储在 BigQuery 表中。
出现以下错误。不确定在哪里设置 default_rpoject_id。
请帮我。
错误:
代码:
python - Airflow:如何从 PostgreOperator 推送 xcom 价值?
我正在使用 Airflow 1.8.1,我想从 PostgreOperator 推送 sql 请求的结果。
这是我的任务:
这是我的 sql 脚本:
当我从中检查 xcom 值时,check_task
它会检索none
值。
airflow - 气流返回“回填完成”而不运行任务
我正在运行 Airflow 并尝试迭代我们从命令行构建的某些任务。
运行气流网络服务器时,一切都按预期工作。但是当我跑步时airflow backfill dag task '2017-08-12'
,气流返回:
...并且实际上并没有运行dag。
当使用airflow test
or airflow run
(即涉及运行任务而不是 dag 的命令)时,它按预期工作
我犯了一个基本的错误吗?我该怎么做才能从这里调试?
谢谢
python-2.7 - airflow.exceptions.AirflowException:在 DAG 中检测到循环。错误的任务
我正在运行气流管道,但代码看起来不错,但实际上我得到了气流。异常。气流异常:在 DAG 中检测到循环。错误的任务:你能帮忙解决这个问题吗
airflow - 气流:动态 SubDag 创建
我有一个用例,其中有一个客户列表。客户端可以从列表中添加或删除,它们可以有不同的开始日期和不同的初始参数。
我想使用气流根据每个客户端的初始开始日期回填所有数据+如果出现故障则重新运行。我正在考虑为每个客户创建一个 SubDag。这会解决我的问题吗?
如何根据 client_id 动态创建 SubDag?