问题标签 [airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
etl - ETL 与工作流管理以及应用哪个?它们可以一样使用吗?
我正在为客户设置数据管道。我已经在分析方面工作了很多年,但现在我在一家只有生产环境的小商店工作。我们做的第一件事是创建一个复制的生产实例,但我想应用一种数据仓库的心态来简化分析部分。
我的问题归结为使用什么工具?还有,为什么?我一直在寻找像 Talened for ETL 这样的解决方案,但也对 Airflow 非常感兴趣。问题是我不太确定哪个更适合我的需要。我想轻松监控和创建工作(我写得很流利,所以创建 Airflow 工作不是问题),但也能够在数据进入时对其进行转换。
任何建议都非常感谢
python - Airbnb Airflow 与 Apache Nifi
Airflow 和 Nifi 在工作流程上执行相同的工作吗?每个人的优点/缺点是什么?我需要读取一些 json 文件,向其中添加更多自定义元数据并将其放入 Kafka 队列中进行处理。我能够在 Nifi 中做到这一点。我仍在研究气流。我正在尝试为我的项目选择最好的工作流引擎谢谢!
python - Airflow:如何从不同的服务器进行 SSH 和运行 BashOperator
有没有办法通过 ssh 连接到不同的服务器并使用 Airbnb 的 Airflow 运行 BashOperator?我正在尝试使用 Airflow 运行 hive sql 命令,但我需要通过 SSH 连接到另一个盒子才能运行 hive shell。我的任务应该是这样的:
- SSH 到服务器 1
- 启动 Hive 外壳
- 运行 Hive 命令
谢谢!
airflow - 使用 trigger_dag 时如何避免 Airflow 回填?
我想创建一个仅在外部触发器上运行的 DAG(即,使用 'airflow trigger_dag' 命令)。但是,当我这样做时,除了我想要的 'manual_xxx' 之外,我还看到了多个 'scheduled_xxx' DagRuns。我假设创建 schedule_xxx DagRuns 是为了回填?
有没有办法只创建'manual_xxx' DagRun 而没有'scheduled_xxx' DagRuns?
我为 start_date 尝试了不同的值(过去,datetime.now() 和未来,但得到了相同的结果。这是我的玩具 DAG ...
这就是我发出 trigger_dag 命令的方式...
high-availability - 气流设置以实现高可用性
如何在高可用性下部署apache气流(正式称为airbnb的气流)调度程序?
我不是在询问显然应该部署在高可用性配置中的后端数据库或 RabbitMQ。
我的主要关注点是调度程序——有什么特别需要做的吗?
python - ImportError:运行气流时没有名为 mako.util 的模块
我正在尝试按照此处的教程进行操作:http: //pythonhosted.org/airflow/tutorial.html
但是我使用的是mac,所以我必须通过安装python brew
,然后附带pip
安装airflow
。但是,这也不太奏效,所以我尝试创建一个virtualenv
我尝试安装的airflow
,它仍然给我这个ImportError: No module named mako.util
不确定是否重要,但这是我的设置:
我如何安装这个mako.util
模块?
python - 气流中的 DAG 级别服务级别协议
我知道目前 SLA 是通过一个简单的任务实现的
sla=datetime.timedelta(hours=1)
在任务实例中,但我想知道有没有办法在气流中实现 DAG 级别的 SLA,如果 dag 中的所有任务都没有在给定的时间增量内完成,则会发送警报?
最好的问候,阿里
python - 如何使用 AirFlow 运行 python 文件的文件夹?
我在 python 文件的文件夹中有一系列 Python 任务:file1.py,file2.py,...
我阅读了 Airflow 文档,但没有看到如何在 DAG 中指定 python 文件的文件夹和文件名?
我想执行那些 python 文件(不是通过 Python Operator 的 Python 函数)。
任务1:执行file1.py(带有一些导入包)
任务2:执行file2.py(与其他一些导入包)
这会很有帮助。感谢和问候
python - 气流 trigger_dag execution_date 是第二天,为什么?
最近我测试了很多气流,execution_date
运行时有一个问题airflow trigger_dag <my-dag>
。
我了解到这execution_date
不是我们第一次从这里想到的:
Airflow 是作为 ETL 需求的解决方案而开发的。在 ETL 世界中,您通常会汇总数据。因此,如果我想汇总 2016 年 2 月 19 日的数据,我会在格林威治标准时间 2016 年 2 月 20 日午夜进行,这将是在 2016 年 2 月 19 日的所有数据可用之后。
上面的代码是我日常工作流程的开始部分,第一个任务是 TimeDeltaSensor,它在实际工作前再等 5 分钟,所以这意味着我的 dag 将在2016-09-09T00:05:00
, 2016-09-10T00:05:00
... 等处触发。
在 Web UI 中,我可以看到类似的scheduled__2016-09-20T00:00:00
内容,并且任务运行在2016-09-21T00:00:00
,根据ETL
模型,这似乎是合理的。
但是有一天我的 dag 没有因未知原因被触发,所以我手动触发它,如果我在 触发它2016-09-20T00:10:00
,那么 TimeDeltaSensor 将等到2016-09-21T00:15:00
运行之前。
这不是我想要的,我希望它2016-09-20T00:15:00
不是在第二天运行,我尝试过execution_date
通过--conf '{"execution_date": "2016-09-20"}'
,但它不起作用。
我应该如何处理这个问题?
python - HttpIO - 在数据流转换中消耗外部资源
我正在尝试Source
使用 Python Dataflow SDK 编写自定义,以从 REST 端点并行读取 JSON 数据。
例如,对于给定的一组 ID,我需要从以下位置检索数据:
https://foo.com/api/results/1
https://foo.com/api/results/2
...
https://foo.com/api/results/{maxID}
我需要的关键功能是监控和速率限制:即使我需要并行性(基于线程/进程或使用异步/协程),我也需要确保我的工作对 API 端点保持“礼貌”——有效避免非自愿DDoS。
使用psq,我应该能够实现某种速率限制机制,但是我将失去使用数据流服务监控来监控进度和 ETA 的能力
看起来,虽然它们可以很好地协同工作,但 Google Cloud Dataflow 和 Google Cloud Pub/Sub(使用 Google Stackdriver Monitoring)之间的监控并不统一
我应该如何构建一个实现速率限制并具有基于 Web 的监控的大规模并行 HTTP 使用者工作流?