问题标签 [apache-airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 如何通过 Airflow 中的 XComs 将参数从 PythonOperator 传递到 HttpSensor?
我想发送一个 HTTP 请求,其参数取决于依赖 Python 可调用的结果。我正在尝试为此目的使用 XComs。简化示例:
不幸的是,在检查了 HTTP 请求的选项后,我发现宏没有正确评估,而是发送了0
, {{ ti.xcom_pull('get_index') }}
。可能出了什么问题?我应该使用HttpOperator
代替HttpSensor
吗?
airflow - 保证一些算子会在同一个气流工作者上执行
我有一个 DAG
- 从云存储下载 csv 文件
- 通过 https 将 csv 文件上传到第三方
我正在执行的气流集群CeleryExecutor
默认使用,所以我担心在某些时候当我扩大工作人员的数量时,这些任务可能会在不同的工作人员上执行。例如。工作人员 A 进行下载,工作人员 B 尝试上传,但找不到文件(因为它在工作人员 A 上)
是否有可能以某种方式保证下载和上传操作员都将在同一个气流工作人员上执行?
airflow - 气流任务卡在“排队”状态并且永远不会运行
我正在使用 Airflow v1.8.1 并在 kubernetes 和 Docker 上运行所有组件(worker、web、flower、scheduler)。我将 Celery Executor 与 Redis 一起使用,我的任务如下所示:
所以start
任务有多个下游。我设置并发相关配置如下:
然后当我手动运行这个 DAG 时(不确定它是否永远不会发生在计划任务上),一些下游被执行,但另一些则停留在“排队”状态。
如果我从管理 UI 中清除任务,它就会被执行。没有工作日志(在处理了一些第一个下游之后,它只是不输出任何日志)。
Web 服务器的日志(不确定worker exiting
是否相关)
调度程序也没有错误日志。每当我尝试此操作时,许多卡住的任务都会发生变化。
因为我也使用 Docker,所以我想知道这是否相关: https ://github.com/puckel/docker-airflow/issues/94 但到目前为止,还没有任何线索。
有没有人遇到过类似的问题或知道我可以针对这个问题调查什么......?
python - 带有 systemd 的气流:`airflow.pid` 与 `airflow-monitor.pid`
我的 systemd 单元文件正在工作(如下)。
但是,该airflow-monitor.pid
文件暂时变为只读文件,这有时会阻止气流启动。如果发生这种情况,我们的解决方法是删除airflow-monitor.pid。这与airflow.pid 不是同一个文件。
它看起来像是airflow.pid
gunicorn 并且airflow-monitor.pid
是作为气流网络服务器的 python 进程。
systemd 单元文件:
这是 pid 文件的输出:
airflow - 没有状态的任务导致 DAG 失败
我有一个 DAG,它从 Elasticsearch 中获取数据并摄取到数据湖中。第一个任务BeginIngestion在多个任务中打开(每个资源一个),这些任务在更多任务中打开(每个分片一个)。获取分片后,将数据上传到 S3,然后关闭到任务EndIngestion,然后是任务AuditIngestion。
它执行正确,但现在所有任务都已成功执行,但“关闭任务” EndIngestion仍然没有状态。当我刷新网络服务器的页面时,DAG 被标记为Failed。
此图显示成功的上游任务,任务end_ingestion
没有状态,DAG 标记为Failed。
我还深入研究了任务实例的详细信息,发现
- Dagrun Running:任务实例的 dagrun 不是处于“运行”状态,而是处于“失败”状态。
- 触发规则:任务的触发规则 'all_success' 要求所有上游任务都已成功,但发现 1 个不成功。upstream_tasks_state = {'失败':0,'upstream_failed':0,'跳过':0,'完成':49,'成功':49},upstream_task_ids = ['s3_finish_upload_ingestion_raichucrud_complain','s3_finish_upload_ingestion_raichucrud_interaction','s3_huiccrud_complaining_up s3_finish_upload_ingestion_raichucrud_user', 's3_finish_upload_ingestion_raichucrud_privatecontactinteraction', 's3_finish_upload_ingestion_raichucrud_location', 's3_finish_upload_ingestion_raichucrud_companytoken', 's3_finish_upload_ingestion_raichucrud_indexevolution', 's3_finish_upload_ingestion_raichucrud_companyindex', '
如您所见,“触发规则”字段表示其中一项任务处于“不成功状态”,但同时统计数据显示所有上游都标记为成功。
如果我重置数据库,它不会发生,但我不能为每次执行(每小时)重置它。我也不想重置它。
有人有灯吗?
PS:我正在使用 LocalExecutor 在 EC2 实例(c4.xlarge)中运行。
[编辑] 我在调度程序日志中发现 DAG 处于死锁状态:
[2017-08-25 19:25:25,821] {models.py:4076} DagFileProcessor157 信息 - 死锁;标记运行失败
我想这可能是由于一些异常处理。
python - 如何使用 Airflow 高效管理单台机器上的资源
我在 2015 年初配备 3.1 GHz Intel Core i7 处理器和 16GB 或 RAM 的 MacBook Pro 上运行具有 +400 个任务的 Airflow 进程。
我正在运行的脚本看起来很像这样,不同之处在于我将 DAG 定义为
尽量避免并行触发太多任务。以下是我做这件事的一系列截图。我的问题是:
- 此操作会生成大量 python 进程。是否有必要以这种方式在 RAM 中定义整个任务队列,或者气流可以采取“随手生成任务”的方法来避免启动这么多进程。
- 我认为
max_active_runs
控制在任何给定时间实际有多少进程正在工作。不过,回顾我的任务,我将有几十个任务占用 CPU 资源,而其余任务则处于空闲状态。这真是低效,我该如何控制这种行为?
以下是一些屏幕截图:
事情有了一个足够好的开始,并行运行的进程比我预期的要多得多:
该过程基本上循环通过这些阶段,直到完成。最终的任务分解如下所示:
有什么想法吗?
python - 气流:让用户使用 ldap 登录
有谁知道我如何能够从气流中获取当前用户?我们启用了后端airflow/contrib/auth/backends/ldap_auth.py
,因此用户通过该身份验证登录,我想知道如何获取单击某些内容的当前用户(我们作为插件拥有的自定义视图)。
airflow - 如何在气流中使用 --conf 选项
我正在尝试运行气流 DAG,并且需要为任务传递一些参数。
如何trigger_dag
在 python DAG 文件中读取在命令行命令中作为 --conf 参数传递的 JSON 字符串。
前任:airflow trigger_dag 'dag_name' -r 'run_id' --conf '{"key":"value"}'
webserver - 如何在端口 80 上运行气流网络服务器
当我将气流网络服务器设置为在端口 80 上运行时,该服务未执行并且失败并出现以下错误:
在 AWS 上托管的 Ubuntu 16.04 上使用 systemd。如果在端口 8080 上运行,整个设置运行良好。
相关配置部分:
服务配置:
airflow - Apache Airflow 与 Google Cloud pubsub 的集成
我对气流很陌生,并尝试使用 apache 气流与谷歌 pubsub 的集成,我猜它是在“Airflow-300”JIRA 下添加的。如果我在这里阅读不正确,请纠正我。
另外,能否请您告知这是否已发布或何时发布?我们正在考虑在 Google Cloud Storage 上添加通知,在发生任何文件事件时,我们希望在 Airflow 中触发一些工作流。
我似乎找不到任何关于如何使用它的文档。
任何建议将不胜感激。