问题标签 [apache-airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
airflow - 在气流中,有没有一种好方法可以调用另一个 dag 的任务?
我有 dag_prime 和 dag_tertiary。
dag_prime
:扫描目录并打算在每个目录上调用 dag_tertiary。目前是 PythonOperator。dag_tertiary
:扫描传递给它的目录并对其内容进行(可能是时间密集的)计算。
我可以从 python 操作员的系统调用中调用第二个,但我觉得必须有更好的方法。如果有一种简单的方法可以做到这一点,我还想考虑对 dag_tertiary 调用进行排队。有没有比使用系统调用更好的方法?
谢谢!
airflow - 在气流主节点上运行特定任务
我有一个任务列表,其中包含在不同工作节点上使用 celery 执行程序运行的任务列表。但是,我想在主节点上运行其中一项任务。那可能吗?
directed-acyclic-graphs - 气流:每个任务运行大量实例时需要建议
这是我在 Stack 上的第一篇文章,是关于 Airflow 的。我需要实现一个 DAG,它将:
1/ 从 API 下载文件
2/ 将它们上传到 Google Cloud Storage
3/ 将它们插入 BigQuery
问题是第 1 步涉及大约 170 个要调用的帐户。如果在下载过程中出现任何错误,我希望我的 DAG 从异常结束的步骤自动重试。因此,我在我的任务之上实现了一个循环,例如:
所以在 UI 级别,我有大约 170 个每个任务显示的实例。当我手动运行 DAG 时,就我所见,Airflow 什么也没做。DAG 不会初始化或排队任何任务实例。我想这是由于涉及的实例数量,但我不知道如何解决这个问题。
我应该如何管理这么多任务实例?
谢谢,
亚历克斯
python - 气流默认 on_failure_callback
在我的 DAG 文件中,我定义了一个 on_failure_callback() 函数来发布 Slack 以防失败。
如果我为我的 DAG 中的每个运算符指定它会很好:on_failure_callback=on_failure_callback()
有没有办法自动化(例如通过 default_args 或通过我的 DAG 对象)向我的所有操作员分派?
python - 如何动态迭代上游任务的输出以在气流中创建并行任务?
考虑以下 DAG 示例,其中第一个任务 ,get_id_creds
从数据库中提取凭据列表。此操作告诉我数据库中的哪些用户可以运行进一步的数据预处理,并将这些 id 写入文件/tmp/ids.txt
。然后我将这些 id 扫描到我的 DAG 中,并使用它们生成upload_transaction
可以并行运行的任务列表。
我的问题是:有没有更惯用的正确、动态的方式来使用气流来做到这一点?我这里的东西感觉笨拙而脆弱。如何将有效 ID 列表从一个进程直接传递给定义后续下游进程的进程?
python - 仍然无法与 Airflow 并行运行所有任务
我有在这个问题中设置的任务。
基于 UI,看起来依赖项是明确定义的:
我已经测试了如下的个别任务airflow test capone_dash_preproc AAAG5608078M2 2017-07-25
:这很好用,更新与该用户对应的目标数据库条目。但是,当我尝试运行完整任务时,它似乎被挂断了。python3 dash_dag.py
基本上在网络服务器上导致以下终端输出,无休止地重复。所有 CPU 都是安静的,因此似乎没有发生太多计算:
我的困惑源于单个测试运行良好并填充数据库的事实。挂起和失败的是整个运行。这里有什么明显的吗?
airflow - 气流在定义中按 dag 取消暂停
有什么办法让服务器保持默认..
dags_are_paused_at_creation = True
...但是对于一个特定的 dag,默认情况下将其定义为未暂停?
python - 在 Airflow 中生成多个任务时颠倒了上游/下游关系
可以在此处找到与此问题相关的原始代码。
我对移位运算符和set_upstream
/set_downstream
方法都在我在 DAG 中定义的任务循环中工作感到困惑。当DAG的主执行循环配置如下:
或者
该图如下所示(字母数字序列是用户 ID,也定义了任务 ID):
当我像这样配置 DAG 的主执行循环时:
或者
图表如下所示:
第二张图是我想要的/我期望根据我对文档的阅读产生的前两个代码片段。如果我想clear_tables
在触发针对不同用户 ID 的一批数据解析任务之前先执行,我应该将其表示为clear_tables >> id_worker(uid)
编辑——这是完整的代码,自我发布最后几个问题以来已经更新,供参考:
在实施@LadislavIndra 的建议后,我继续对位移运算符进行相同的反向实施,以获得正确的依赖关系图。
更新@AshBerlin-Taylor 的回答是这里发生了什么。我假设 Graph View 和 Tree View 做同样的事情,但事实并非如此。这是id_worker(uid) >> clear_tables
图表视图中的样子:
我当然不希望我的数据准备程序的最后一步是删除所有数据表!
airflow - 气流:日志文件不是本地的,不受支持的远程日志位置
airflow.cfg 文件中的日志相关设置为:
remote_base_log_folder =
base_log_folder = /home/my_projects/ksaprice_project/airflow/logs
worker_log_server_port = 8793
child_process_log_directory = /home/my_projects/ksaprice_project/airflow/logs/scheduler
虽然我正在设置 remote_base_log_folter 它试图从中获取日志http://:8793/log/tutorial/print_date/2017-08-02T00:00:00
- 我不明白这种行为。根据设置,工作人员应该将日志存储在,/home/my_projects/ksaprice_project/airflow/logs
并且应该从同一位置而不是远程获取日志。