问题标签 [airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
808 浏览

etl - ETL 与工作流管理以及应用哪个?它们可以一样使用吗?

我正在为客户设置数据管道。我已经在分析方面工作了很多年,但现在我在一家只有生产环境的小商店工作。我们做的第一件事是创建一个复制的生产实例,但我想应用一种数据仓库的心态来简化分析部分。

我的问题归结为使用什么工具?还有,为什么?我一直在寻找像 Talened for ETL 这样的解决方案,但也对 Airflow 非常感兴趣。问题是我不太确定哪个更适合我的需要。我想轻松监控和创建工作(我写得很流利,所以创建 Airflow 工作不是问题),但也能够在数据进入时对其进行转换。

任何建议都非常感谢

0 投票
1 回答
33872 浏览

python - Airbnb Airflow 与 Apache Nifi

Airflow 和 Nifi 在工作流程上执行相同的工作吗?每个人的优点/缺点是什么?我需要读取一些 json 文件,向其中添加更多自定义元数据并将其放入 Kafka 队列中进行处理。我能够在 Nifi 中做到这一点。我仍在研究气流。我正在尝试为我的项目选择最好的工作流引擎谢谢!

0 投票
3 回答
42022 浏览

python - Airflow:如何从不同的服务器进行 SSH 和运行 BashOperator

有没有办法通过 ssh 连接到不同的服务器并使用 Airbnb 的 Airflow 运行 BashOperator?我正在尝试使用 Airflow 运行 hive sql 命令,但我需要通过 SSH 连接到另一个盒子才能运行 hive shell。我的任务应该是这样的:

  1. SSH 到服务器 1
  2. 启动 Hive 外壳
  3. 运行 Hive 命令

谢谢!

0 投票
1 回答
671 浏览

airflow - 使用 trigger_dag 时如何避免 Airflow 回填?

我想创建一个仅在外部触发器上运行的 DAG(即,使用 'airflow trigger_dag' 命令)。但是,当我这样做时,除了我想要的 'manual_xxx' 之外,我还看到了多个 'scheduled_xxx' DagRuns。我假设创建 schedule_xxx DagRuns 是为了回填?

有没有办法只创建'manual_xxx' DagRun 而没有'scheduled_xxx' DagRuns?

我为 start_date 尝试了不同的值(过去,datetime.now() 和未来,但得到了相同的结果。这是我的玩具 DAG ...

这就是我发出 trigger_dag 命令的方式...

0 投票
2 回答
7437 浏览

high-availability - 气流设置以实现高可用性

如何在高可用性下部署apache气流(正式称为airbnb的气流)调度程序?

我不是在询问显然应该部署在高可用性配置中的后端数据库或 RabbitMQ。

我的主要关注点是调度程序——有什么特别需要做的吗?

0 投票
1 回答
1183 浏览

python - ImportError:运行气流时没有名为 mako.util 的模块

我正在尝试按照此处的教程进行操作:http: //pythonhosted.org/airflow/tutorial.html

但是我使用的是mac,所以我必须通过安装python brew,然后附带pip安装airflow。但是,这也不太奏效,所以我尝试创建一个virtualenv我尝试安装的airflow,它仍然给我这个ImportError: No module named mako.util

不确定是否重要,但这是我的设置:

我如何安装这个mako.util模块?

0 投票
0 回答
1890 浏览

python - 气流中的 DAG 级别服务级别协议

我知道目前 SLA 是通过一个简单的任务实现的

sla=datetime.timedelta(hours=1)

在任务实例中,但我想知道有没有办法在气流中实现 DAG 级别的 SLA,如果 dag 中的所有任务都没有在给定的时间增量内完成,则会发送警报?

最好的问候,阿里

0 投票
3 回答
27278 浏览

python - 如何使用 AirFlow 运行 python 文件的文件夹?

我在 python 文件的文件夹中有一系列 Python 任务:file1.py,file2.py,...

我阅读了 Airflow 文档,但没有看到如何在 DAG 中指定 python 文件的文件夹和文件名?

我想执行那些 python 文件(不是通过 Python Operator 的 Python 函数)。

任务1:执行file1.py(带有一些导入包)

任务2:执行file2.py(与其他一些导入包)

这会很有帮助。感谢和问候

0 投票
2 回答
23651 浏览

python - 气流 trigger_dag execution_date 是第二天,为什么?

最近我测试了很多气流,execution_date运行时有一个问题airflow trigger_dag <my-dag>

我了解到这execution_date不是我们第一次从这里想到的:

Airflow 是作为 ETL 需求的解决方案而开发的。在 ETL 世界中,您通常会汇总数据。因此,如果我想汇总 2016 年 2 月 19 日的数据,我会在格林威治标准时间 2016 年 2 月 20 日午夜进行,这将是在 2016 年 2 月 19 日的所有数据可用之后。

上面的代码是我日常工作流程的开始部分,第一个任务是 TimeDeltaSensor,它在实际工作前再等 5 分钟,所以这意味着我的 dag 将在2016-09-09T00:05:00, 2016-09-10T00:05:00... 等处触发。

在 Web UI 中,我可以看到类似的scheduled__2016-09-20T00:00:00内容,并且任务运行在2016-09-21T00:00:00,根据ETL模型,这似乎是合理的。

但是有一天我的 dag 没有因未知原因被触发,所以我手动触发它,如果我在 触发它2016-09-20T00:10:00,那么 TimeDeltaSensor 将等到2016-09-21T00:15:00运行之前。

这不是我想要的,我希望它2016-09-20T00:15:00不是在第二天运行,我尝试过execution_date通过--conf '{"execution_date": "2016-09-20"}',但它不起作用。

我应该如何处理这个问题?

0 投票
1 回答
186 浏览

python - HttpIO - 在数据流转换中消耗外部资源

我正在尝试Source使用 Python Dataflow SDK 编写自定义,以从 REST 端点并行读取 JSON 数据。

例如,对于给定的一组 ID,我需要从以下位置检索数据: https://foo.com/api/results/1 https://foo.com/api/results/2 ... https://foo.com/api/results/{maxID}

我需要的关键功能是监控和速率限制:即使我需要并行性(基于线程/进程或使用异步/协程),我也需要确保我的工作对 API 端点保持“礼貌”——有效避免非自愿DDoS。

使用psq,我应该能够实现某种速率限制机制,但是我将失去使用数据流服务监控来监控进度和 ETA 的能力

看起来,虽然它们可以很好地协同工作,但 Google Cloud Dataflow 和 Google Cloud Pub/Sub(使用 Google Stackdriver Monitoring)之间的监控并不统一

我应该如何构建一个实现速率限制并具有基于 Web 的监控的大规模并行 HTTP 使用者工作流?