问题标签 [airflow-scheduler]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
5963 浏览

airflow - 如何将参数从 pythonoperator 任务传递给气流 dag 中的 simplehttpoperator 任务?

我想触发一个简单的httpoperator,像这样:airflow trigger_dag test_trigger --conf '{"name":"something"}'

然后我使用 pythonoperator python_callable 通过使用 kwargs['dag_run'].conf 来接受参数,我想将 ['dag_run'].conf 传递给 simplehttpoperator,我该怎么做?有人可以帮忙吗?

0 投票
1 回答
4981 浏览

airflow - 气流 DAG 未安排

我是 Airflow 的新手,并创建了我的第一个 DAG。这是我的 DAG 代码。我希望 DAG 现在开始,然后每天运行一次。

DAG 没有被 Airflow 选中。我检查了日志,这就是它所说的。

我到底做错了什么?我尝试将 schedule_interval 更改为 schedule_interval=timedelta(minutes=1) 以查看它是否立即启动,但仍然没有用。我可以在 Airflow UI 中按预期看到 DAG 下的任务,但计划状态为“无状态”。请在这里帮助我。

0 投票
1 回答
4274 浏览

python - Airflow ExternalTask​​Sensors - 创建 dags 依赖项

我想创建对 DAG A 和 DAG B 的 dag 依赖项。DAG A 有两个任务: TASK1 和 TASK2 。DAG B 有 3 个任务:TASK1、TASK2 和 TASK3。

我的要求是 DAG B 在 DAG A TASK1 之后开始。

两个 DAGS 都是每小时运行一次,DAG A 运行 @every hours EX: 10.00,DAG B 运行 @every hours ex:10.30。

我正在使用 Airflow 和运算符 EXternalTask​​Sensors 但它不起作用。

0 投票
3 回答
716 浏览

airflow - Apache Airflow Celery Redis 解码错误

使用最新版本的 apache 气流。从 LocalExecutor 开始,在该模式下一切正常,除了一些交互,Web UI 状态需要 CeleryExecutor 才能使用它们。使用 Redis 安装和配置 Celery 执行器,将 Redis 配置为代理 URL 和结果后端。

一开始它似乎可以工作,直到安排了一个任务,此时它给出了以下错误:

似乎是泡菜序列化错误,但我不确定如何追踪原因。有什么建议么?

这个问题一直影响我使用 subdag 功能的工作流程,也许问题与此有关。

注意:我也使用 rabbitMQ 进行了测试,那里有不同的问题;客户端显示“对等方重置连接”并崩溃。RabbitMQ 日志显示“客户端意外关闭 TCP 连接”。

0 投票
1 回答
1127 浏览

airflow - 如何设置多操作员 dag,以便在运行实例的所有任务完成之前不会实例化另一个实例?

我们的气流实施中有多运营商 dags。假设 dag-a 有运算符 t1、t2、t3,它们设置为按顺序运行(即 t2 依赖于 t1,t3 依赖于 t2。)

我们需要确保当 dag-a 被实例化时,它的所有任务在同一个 dag 的另一个实例被实例化之前(或在下一个 dag 实例上的第一个任务被触发之前)成功完成。

我们在 dags 中设置了以下内容:

现在发生的情况是,如果实例化的 dag 没有任何错误,我们就会看到预期的效果。
但是,假设 dag-a 计划每小时运行一次。按计划触发 dag-a-i1 实例。然后 dag-a-i1 任务 t1 运行成功,然后 t2 开始运行并失败。在那种情况下,我们看到 dag-a-i1 实例按预期停止。当下一个小时到来时,我们看到 dag-a-i2 实例被触发,我们看到该 dag 实例 (i2) 的任务 t1 开始运行并假设完成,然后 dag-a-i2 停止,因为它的 t2 不能运行,因为 t2 的先前实例(对于 dag-a-i1)处于失败状态。

我们需要看到的行为是第二​​个实例没有被触发,或者如果它被触发,我们不希望看到第二个实例的任务 t1 被触发。这给我们带来了问题。

任何帮助表示赞赏。

0 投票
0 回答
593 浏览

python-2.7 - 气流回填到现在还没有完全填满

我正在努力解决一个我似乎无法弄清楚的奇怪问题。我有一个没有什么花哨的基本 DAG。它只是利用 bash 运算符来启动 Python 脚本。

我有这个 DAG 计划在每个星期一运行。当我打开网络服务器中的 dag 时,它开始回填到 9 月 25 日。但是,它不会在 10 月 2 日回填。当我将日程安排从每周更改为每天时,它工作正常。

这是我的 DAG 设置。

从这张照片中可以看出,回填到 25 日为止都很好。之后没有新任务排队。

DAG 运行

我在这里做错了什么?我有其他 DAG 已经运行好几个星期了。我还重新启动了调度程序和网络服务器,这没有帮助。

编辑:

下面的主题似乎涵盖了同样的问题。但是,这改变了我的问题。如何让气流在给定日期每周运行,而不是等待整个时间段结束?

气流不会回填最新运行

0 投票
1 回答
7109 浏览

airflow - 如何定义不应定期运行的 Airflow DAG/任务

目标非常简单:我需要为不应定期运行的手动任务创建 DAG,但仅当管理员按下“运行”按钮时。理想情况下,无需切换“取消暂停”和“暂停” DAG(您知道有人肯定会忘记暂停)。

到目前为止,我只来了schedule_interval="0 0 30 2 *"(希望永远不会发生 2 月 30 日),但一定有更好的方法!

在那儿?

0 投票
5 回答
10769 浏览

airflow - DAG 在 Airflow 中完成运行后如何删除 XCOM 对象

我在 XCOM 中有一个巨大的 json 文件,稍后一旦 dag 执行完成,我就不需要它了,但是我仍然在 UI 中看到带有所有数据的 Xcom 对象,有没有办法在 DAG 运行后以编程方式删除 XCOM完成了。

谢谢

0 投票
0 回答
445 浏览

google-cloud-dataflow - 通过 Luigi 或 Airflow 进行云数据流工作流管理

我希望通过任何工作流调度程序管理我的依赖云数据流作业。以前有人做过吗?我也浏览了 Airflow 和 Luigi 的文档部分,但我确实需要一些工作示例。

请帮助我提供一些相关示例或链接,这些示例或链接可以帮助我探索和实施 Dataflow 作业的工作流管理。

0 投票
2 回答
4844 浏览

airflow - 气流回填新任务添加到 dag

假设今天是 2017-10-20。我有一个现有的 dag,直到今天仍然成功。我需要添加一个 start_date 为 2017-10-01 的任务。如何使调度程序从 2017-10-01 到 2017-10-20 自动触发任务?