问题标签 [airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
28737 浏览

python - 如何将 Airflow dag 配置为每天在特定时间运行?

无论发生什么,如何将 Airflow dag 配置为每天在指定时间执行,就像 crons 一样。

我知道使用 TimeSensor 可以获得类似的行为,但在这种情况下,它取决于传感器任务,并且可能与 dag 执行时间冲突。

示例:使用传感器方法,如果我有传感器在 0 小时 15 分钟运行,但如果 dag 稍后执行,那么我的任务就会延迟,所以即使是传感器方法,我也需要确保 Dag 在正确的时间执行。

那么如何确保 Dag 在指定时间执行呢?

0 投票
2 回答
858 浏览

hadoop - 用于构建基于 hadoop 的数据管道的调度工具的建议

在 Apache Oozie、Spotify/Luigiairbnb/airflow之间,它们各自的优缺点是什么?

我过去曾使用 oozie 和气流来构建使用 PIG 和 Hive 的数据摄取管道。目前,我正在构建一个查看日志并提取有用事件并将它们置于红移状态的管道。

我发现气流更容易使用/测试/设置。它有一个更酷的 UI,并允许用户从 UI 本身执行操作,这与 Oozie 不同。欢迎提供有关 Luigi 的任何信息或其他有关稳定性和问题的见解。

0 投票
1 回答
2090 浏览

python - Airflow DAG 没有收听@monthly schedule_interval

我正在尝试在 Airflow 中安排每月一次的 DAG。即使它每天都在运行dag.schedule_interval = '@monthly'(根据此处的说明http://airflow.readthedocs.org/en/latest/scheduler.html#dag-runs)。我也设置了dag_args['start_date'] = datetime(2016, 2, 1, 20, 0)and starts_at = time(13,0)。任何想法为什么会发生这种情况?

我应该改用以下设置吗?

dag_args['start_date'] = datetime(2016, 2, 1, 0, 0)

dag_args['schedule_interval'] = '0 19 1 * *'

0 投票
2 回答
4205 浏览

python - 气流 cron 表达式未正确调度 dag

我正在探索将 Airflow 用作 cron,以便在设置 cron 时可以使用它的其他功能。

我通过设置 cron like 来测试它的功能 "2,3,5,8, * * * *"。我期待特定的 dag 安排在每小时的第 2、3、5 和 8 分钟。然而实际上,第 2 分钟的 dag 在 3 日执行,3 日在 5 日执行,5 日在 8 日执行。而且它根本没有执行到第 8 次。我想它将在下一小时的第 2 分钟执行第 8 次。

看起来像是气流中 cron 表达式的某种错误或未处理的情况。

0 投票
2 回答
1839 浏览

python - How to deploy modified airflow dag from a different start time?

Lets say scheduler is stopped for 5 hours and I had dag scheduled for twice every hour. Now when I restart the scheduler I do not want to airflow to backfill all the instances those were missed, Instead I want it to continue from the current hour.

0 投票
2 回答
3894 浏览

python - 在 Python 的 Airflow 中,如何在特定时间后停止任务运行?

我正在尝试使用 Python 的 Airflow 库。我希望它定期抓取网页。

我遇到的问题是,如果我start_date是几天前,当我启动调度程序时,它将从start_date今天回填。例如:

假设今天是本月的 20 日。

假设start_date是本月的 15 日。

如果我在 20 号启动调度程序,它将在 20 号刮掉页面 5 次。它将看到一个 DAG 实例应该在 15 日运行,并将在 20 日运行该 DAG 实例(15 日的实例)。然后它将在 20 日 16 日运行 DAG 实例,以此类推。

简而言之,Airflow 会尝试“赶上”,但这对于网页抓取来说没有意义。

有什么方法可以让 Airflow 在一定时间后认为 DAG 实例失败?

0 投票
5 回答
83641 浏览

airflow - 如何正确工作气流 schedule_interval

我想尝试使用 Airflow 而不是 Cron。但是 schedule_interval 并没有像我预期的那样工作。

我写了如下的python代码。
据我了解,Airflow 应该在“2016/03/30 8:15:00”上运行,但当时它没有工作。

如果我像这样更改它“'schedule_interval': timedelta(minutes = 5)”,我认为它可以正常工作。

“notice_slack.sh”只是将 slack api 调用到我的频道。

我想每天在特定时间运行我的一些脚本,就像这个 cron 设置一样。

我已阅读文档Scheduling & Triggers,我知道它与 cron 有点不同。
所以我尝试安排在“start_date”和“schedule_interval”设置。

有谁知道我该怎么办?

气流版本

信息 - 使用执行器 LocalExecutor

v1.7.0

amazon-linux-ami/2015.09-release-notes

0 投票
4 回答
3484 浏览

celery - 气流:何时使用 CeleryExecutor 以及何时使用 MesosExecutor

我对 Airflow 很陌生,并试图了解我们应该如何在我们的环境中设置它(在 aws 上)。

我读过 Airflow 使用 Celery 和 redis 代理。它与 Mesos 有何不同?我以前没有使用过 Celery,但我尝试在我的开发机器上设置 celery-redis,它很容易工作。但是添加新组件意味着添加更多监控。

由于我们已经使用 mesos 进行集群管理,所以我想如果我不选择 celery 而使用 MesosExecutor 会错过什么?

0 投票
7 回答
109606 浏览

airflow - execution_date in airflow: need to access as a variable

I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.

I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'

This is what my pipeline looks like(very simplified version):

If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal()) Instead what I want here is 'execution_date'

How do I use 'execution_date' directly and assign it to a variable in my python file?

I have having this general issue of accessing args. Any help will be genuinely appreciated.

Thanks

0 投票
4 回答
23980 浏览

airflow - 配置 Airflow 以使用 CeleryExecutor

我尝试将 Airbnb AirFlow 配置为使用 CeleryExecutor,如下所示:

我将executerairflow.cfg中的从更改SequentialExecutorCeleryExecutor

但我收到以下错误:

请注意,sql_alchemy_conn配置如下:

我查看了 Airflow 的 GIT(https://github.com/airbnb/airflow/blob/master/airflow/configuration.py

发现下面的代码抛出了这个异常:

从这种validate方法看来,sql_alchemy_conn不能包含sqlite.

你知道如何配置CeleryExecutor没有 sqllite 吗?请注意,我下载了 rabitMQ 以根据需要与 CeleryExecuter 一起使用。