问题标签 [apache-airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
22100 浏览

airflow - 在气流中,有没有一种好方法可以调用另一个 dag 的任务?

我有 dag_prime 和 dag_tertiary。

  • dag_prime:扫描目录并打算在每个目录上调用 dag_tertiary。目前是 PythonOperator。
  • dag_tertiary:扫描传递给它的目录并对其内容进行(可能是时间密集的)计算。

我可以从 python 操作员的系统调用中调用第二个,但我觉得必须有更好的方法。如果有一种简单的方法可以做到这一点,我还想考虑对 dag_tertiary 调用进行排队。有没有比使用系统调用更好的方法?

谢谢!

0 投票
1 回答
1460 浏览

airflow - 在气流主节点上运行特定任务

我有一个任务列表,其中包含在不同工作节点上使用 celery 执行程序运行的任务列表。但是,我想在主节点上运行其中一项任务。那可能吗?

0 投票
2 回答
487 浏览

directed-acyclic-graphs - 气流:每个任务运行大量实例时需要建议

这是我在 Stack 上的第一篇文章,是关于 Airflow 的。我需要实现一个 DAG,它将:

1/ 从 API 下载文件

2/ 将它们上传到 Google Cloud Storage

3/ 将它们插入 BigQuery

问题是第 1 步涉及大约 170 个要调用的帐户。如果在下载过程中出现任何错误,我希望我的 DAG 从异常结束的步骤自动重试。因此,我在我的任务之上实现了一个循环,例如:

所以在 UI 级别,我有大约 170 个每个任务显示的实例。当我手动运行 DAG 时,就我所见,Airflow 什么也没做。DAG 不会初始化或排队任何任务实例。我想这是由于涉及的实例数量,但我不知道如何解决这个问题。

我应该如何管理这么多任务实例?

谢谢,

亚历克斯

0 投票
2 回答
24227 浏览

python - 气流默认 on_failure_callback

在我的 DAG 文件中,我定义了一个 on_failure_callback() 函数来发布 Slack 以防失败。

如果我为我的 DAG 中的每个运算符指定它会很好:on_failure_callback=on_failure_callback()

有没有办法自动化(例如通过 default_args 或通过我的 DAG 对象)向我的所有操作员分派?

0 投票
2 回答
5225 浏览

python - 如何动态迭代上游任务的输出以在气流中创建并行任务?

考虑以下 DAG 示例,其中第一个任务 ,get_id_creds从数据库中提取凭据列表。此操作告诉我数据库中的哪些用户可以运行进一步的数据预处理,并将这些 id 写入文件/tmp/ids.txt。然后我将这些 id 扫描到我的 DAG 中,并使用它们生成upload_transaction可以并行运行的任务列表。

我的问题是:有没有更惯用的正确、动态的方式来使用气流来做到这一点?我这里的东西感觉笨拙而脆弱。如何将有效 ID 列表从一个进程直接传递给定义后续下游进程的进程?

0 投票
1 回答
1610 浏览

python - 仍然无法与 Airflow 并行运行所有任务

我有在这个问题中设置的任务。

基于 UI,看起来依赖项是明确定义的:

在此处输入图像描述

我已经测试了如下的个别任务airflow test capone_dash_preproc AAAG5608078M2 2017-07-25:这很好用,更新与该用户对应的目标数据库条目。但是,当我尝试运行完整任务时,它似乎被挂断了。python3 dash_dag.py基本上在网络服务器上导致以下终端输出,无休止地重复。所有 CPU 都是安静的,因此似乎没有发生太多计算:

我的困惑源于单个测试运行良好并填充数据库的事实。挂起和失败的是整个运行。这里有什么明显的吗?

0 投票
2 回答
839 浏览

airflow - 气流在定义中按 dag 取消暂停

有什么办法让服务器保持默认..

dags_are_paused_at_creation = True

...但是对于一个特定的 dag,默认情况下将其定义为未暂停?

0 投票
2 回答
2230 浏览

python - 在 Airflow 中生成多个任务时颠倒了上游/下游关系

可以在此处找到与此问题相关的原始代码。

我对移位运算符和set_upstream/set_downstream方法都在我在 DAG 中定义的任务循环中工作感到困惑。当DAG的主执行循环配置如下:

或者

该图如下所示(字母数字序列是用户 ID,也定义了任务 ID):

在此处输入图像描述

当我像这样配置 DAG 的主执行循环时:

或者

图表如下所示:

在此处输入图像描述

第二张图是我想要的/我期望根据我对文档的阅读产生的前两个代码片段。如果我想clear_tables在触发针对不同用户 ID 的一批数据解析任务之前先执行,我应该将其表示为clear_tables >> id_worker(uid)

编辑——这是完整的代码,自我发布最后几个问题以来已经更新,供参考:

在实施@LadislavIndra 的建议后,我继续对位移运算符进行相同的反向实施,以获得正确的依赖关系图。

更新@AshBerlin-Taylor 的回答是这里发生了什么。我假设 Graph View 和 Tree View 做同样的事情,但事实并非如此。这是id_worker(uid) >> clear_tables图表视图中的样子:

在此处输入图像描述

我当然不希望我的数据准备程序的最后一步是删除所有数据表!

0 投票
1 回答
1913 浏览

celery - 来自 UI 的 Airflow Running 任务,KeyError:没有这样的传输

与 celery 相关的气流 cfg 设置为:

airflow webserver运行正常,但是在从气流 UI 运行任务时出现错误 。从 ui 运行任务时出错

我在运行气流调度程序时出错,tracecak 是:

我的模块版本是:

  1. 气流==1.8
  2. 芹菜==4.1.0
  3. 海带==4.1.0
  4. 蟒蛇==2.7.12
0 投票
3 回答
12933 浏览

airflow - 气流:日志文件不是本地的,不受支持的远程日志位置

我无法从 Airflow UI 看到附加到任务的日志: 在此处输入图像描述

airflow.cfg 文件中的日志相关设置为:

  • remote_base_log_folder =
  • base_log_folder = /home/my_projects/ksaprice_project/airflow/logs
  • worker_log_server_port = 8793
  • child_process_log_directory = /home/my_projects/ksaprice_project/airflow/logs/scheduler

虽然我正在设置 remote_base_log_folter 它试图从中获取日志http://:8793/log/tutorial/print_date/2017-08-02T00:00:00- 我不明白这种行为。根据设置,工作人员应该将日志存储在,/home/my_projects/ksaprice_project/airflow/logs并且应该从同一位置而不是远程获取日志。

更新 task_instance 表内容: 在此处输入图像描述