问题标签 [airflow-2.x]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
89 浏览

python - BigQueryInsertJobOperator 默认项目 ID 的配置

我有一种情况,我的数据位于不同的 GCP 项目中,例如“data-pro”,而我的计算项目设置为不同的 GCP 项目,该项目可以访问“data-pro”的表。那么有没有办法指定查询必须使用的默认项目ID?我可以看到有一个默认的数据集、参数..但没有默认的 projectID。

所以我的查询如下:

select name ,id from employeedDB.employee .// 这个employeeDB在data-proc中

我的 BigQueryInsertJobOperator 配置如下:

BigQueryInsertJobOperator(dag=dag, task_id=name, gcp_conn_id=connection_id,--//connection_id over compute-proc configuration={ "query": { "query": "{% include '"+sqlFile+"' %}", " useLegacySql": False }, }, pool='bqJobPool')

0 投票
0 回答
46 浏览

authentication - Github Oauth 身份验证不适用于 Airflow 2.1.3

这是 github 在 webserver_config.py 中作为 OAUTH_PROVIDERS 的配置,用于 Airflow 2

这些是使用的标志

0 投票
1 回答
27 浏览

airflow - 气流1和气流2之间的调度程序日志不同

因为我不能使用气流 CLI,我实际上是在气流1上使用 grep 解析调度程序日志,以便检索一些信息,例如:

  • 检查 dag 是否被触发/是否成功/使用模式“INFO Marking run”开始时间戳:

[2021-12-01 11:06:50,340] {logging_mixin.py:112} 信息 - [2021-12-01 11:06:50,339] {dagrun.py:307} 信息 - 标记运行 <DagRun prd_*** @ 2021-12-01 10:02:00+00:00:预定__2021-12-01T10:02:00+00:00,外部触发:False>成功

  • 当 dag 没有被触发时,我使用模式“INFO - Created”来检索 dag 的开始时间戳:

[2021-12-01 11:04:49,213] {scheduler_job.py:1298} 信息 - 创建 <DagRun prd_*** @ 2021-12-01T10:02:00+00:00: schedule__2021-12-01T10:02 :00+00:00,外部触发:False>

它在气流 1 上运行良好,但迁移后我无法在气流2调度程序日志中找到这些数据。

是否需要更改配置?

问候, 游吟诗人

0 投票
0 回答
24 浏览

python - 给定一个 dag run id,是否可以使用 Airflow SDK 找到 dag id

给定一个 DAG 运行 ID(例如schedule__2021-11-30T09:30:00+00:00

是否可以使用 Airflow python SDK 找到 dag 和 dag 运行的 DAG id 和其他详细信息?

0 投票
0 回答
69 浏览

airflow - 气流任务运行成功但显示 SIGTERM 错误

我有一个气流任务,其日志显示运行成功,但相同的日志也显示 SIGTERM。

我知道任务已经成功,因为 datadog 在那天显示了该过程的指标推送。在气流收到外部触发器或类似的东西之后,但我确信没有人手动将任务标记为成功,我们没有任何进程可以改变该进程的状态。

这只随机发生了两次。

这是一种已知的行为还是有任何需要调查的提示。我虽然在增加AIRFLOW_CORE_KILLED_TASK_CLEANUP_TIME,但由于它是随机的,而且它发生了几次,甚至不知道如何检查它是否是解决方案,以防它工作正常。

0 投票
1 回答
51 浏览

airflow - Airflow 是否适合每秒运行 DAG?

我正在使用气流版本 2.2.2 和 celery 作为执行器。这是我想要实现的目标

  1. Dag 应该每秒运行一次
  2. 一次只能运行一个活动 dag。

以前我用 celery beat 和 celery 解决了这个案例。由于将应用程序移动到基于集群的调度程序,我们正在尝试使用气流。有人可以建议我们可以在这个用例中使用气流吗

0 投票
0 回答
33 浏览

python-3.x - DAG 将在前几天立即触发,而不是 1

我正在尝试触发 dag,并从我的个人 UI 中包含一个编辑功能。这个想法是,用户将能够编辑任何现有的时间表,无论是具有相同的 dag_id 还是不同的 dag_id。因此,我已经包含了所有参数,例如时间戳,以使我的 dag_id 唯一,这样如果用户尝试编辑现有的 dag(他可能会更改/保留现有的 dag_id),dag_id 将自动更改并运行.

在上面的代码中,我将 'mode' 作为输入,它是一个字符串和可能的值 - 'Days' / 'Moths'。基于此,将决定 cron。

'time' 参数的格式为 hh:mm,即 dag 将在特定日期开始的时间,'step' 每隔 1 或 2 或 3 或....30 天重复一次。因此,如果用户从 UI 中选择“天”,我将显示在 1-31 范围内选择的选项,如果用户选择“月”,我将显示选项 1-12。

这是我的 DAG,每个编辑都有唯一的 dag_id-

task2 is - 删除所有过去的 dag,除了正在运行的一个 task1 is - some task

问题是,当 step = 1 时,使用此代码,这将在我打算开始的日期和准确的时间 hh:mm 上执行,并且还可以正确安排未来的运行。

但是当 step = 2 或 3 等时,dag 会立即被触发,而不是我通过前端 hh:mm 提供的时间。为什么这样?我相信我的任务是 timedelta(days = step),在这种情况下它不应该发生。

例如,对于 step =2,如果今天是 2021-12-06,则第一个 dag_run 立即被触发,即 2021-12-04 的运行,下一次运行显示 2021-12-05 而不是 2021-12-06 . 但是对于 step = 1 时的相同代码,这可以正常工作。有什么建议吗?

0 投票
1 回答
49 浏览

airflow - 使用taskflow api时命名Airflow dags而不是python可调用

我尝试使用任务流 API 创建多个 dag,其中传递了一个变量,dag 中的任务可以使用该变量

例如,我正在尝试使用此代码

理想情况下,这将创建两个 ID 为 dag_1 和 dag_2 的 dag。一个 dag 将打印字符串“string1”,其他 6 个。这几乎适用于创建 1 个 dag 且 ID 为 dag_template 打印 6 的代码。

文档中有 dag 将被称为 python 可调用,是否可以覆盖它。

0 投票
1 回答
82 浏览

python - 有没有办法在非 Airflow 脚本中本地运行 SnowflakeOperator、SnowflakeHook?

我有一些使用 SnowflakeOperator 和 SnowflakeHook 的 DAG。他们都使用snowflake_connection输入连接到雪花,我保存Admin > Connections在 Airflow 下。

虽然这种安排在 Airflow 环境中有效,但我想在我的定位机器中将代码作为常规脚本运行。但它无法访问snowflake_connection我保存在 Airflow 中的那个。有什么解决方法吗?

有没有办法可以直接提供连接字符串或用户名/密码SnowflakeHookSnowflakeOperator建立数据库连接?

0 投票
0 回答
70 浏览

airflow - 气流任务成功但返回 sigterm

我有一个任务Airflow 2.1.2以成功状态完成,但在该日志显示一个 sigterm 之后:

如您所见,第一行返回Done,并且此日志的前几行显示所有脚本都运行良好并且数据已插入Datawarehouse.

在第 8 行中,它显示SIGTERM由于一些外部触发器将其标记为成功,但我确信没有人使用API,或者CLI将其标记为成功也没有UI. 知道如何避免它,为什么会发生这种情况?

我不知道是否可能增加AIRFLOW_CORE_KILLED_TASK_CLEANUP_TIME可以解决它,但我想了解它。