问题标签 [airflow-2.x]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - BigQueryInsertJobOperator 默认项目 ID 的配置
我有一种情况,我的数据位于不同的 GCP 项目中,例如“data-pro”,而我的计算项目设置为不同的 GCP 项目,该项目可以访问“data-pro”的表。那么有没有办法指定查询必须使用的默认项目ID?我可以看到有一个默认的数据集、参数..但没有默认的 projectID。
所以我的查询如下:
select name ,id from employeedDB.employee .// 这个employeeDB在data-proc中
我的 BigQueryInsertJobOperator 配置如下:
BigQueryInsertJobOperator(dag=dag, task_id=name, gcp_conn_id=connection_id,--//connection_id over compute-proc configuration={ "query": { "query": "{% include '"+sqlFile+"' %}", " useLegacySql": False }, }, pool='bqJobPool')
authentication - Github Oauth 身份验证不适用于 Airflow 2.1.3
这是 github 在 webserver_config.py 中作为 OAUTH_PROVIDERS 的配置,用于 Airflow 2
这些是使用的标志
airflow - 气流1和气流2之间的调度程序日志不同
因为我不能使用气流 CLI,我实际上是在气流1上使用 grep 解析调度程序日志,以便检索一些信息,例如:
- 检查 dag 是否被触发/是否成功/使用模式“INFO Marking run”开始时间戳:
[2021-12-01 11:06:50,340] {logging_mixin.py:112} 信息 - [2021-12-01 11:06:50,339] {dagrun.py:307} 信息 - 标记运行 <DagRun prd_*** @ 2021-12-01 10:02:00+00:00:预定__2021-12-01T10:02:00+00:00,外部触发:False>成功
- 当 dag 没有被触发时,我使用模式“INFO - Created”来检索 dag 的开始时间戳:
[2021-12-01 11:04:49,213] {scheduler_job.py:1298} 信息 - 创建 <DagRun prd_*** @ 2021-12-01T10:02:00+00:00: schedule__2021-12-01T10:02 :00+00:00,外部触发:False>
它在气流 1 上运行良好,但迁移后我无法在气流2调度程序日志中找到这些数据。
是否需要更改配置?
问候, 游吟诗人
python - 给定一个 dag run id,是否可以使用 Airflow SDK 找到 dag id
给定一个 DAG 运行 ID(例如schedule__2021-11-30T09:30:00+00:00)
是否可以使用 Airflow python SDK 找到 dag 和 dag 运行的 DAG id 和其他详细信息?
airflow - 气流任务运行成功但显示 SIGTERM 错误
我有一个气流任务,其日志显示运行成功,但相同的日志也显示 SIGTERM。
我知道任务已经成功,因为 datadog 在那天显示了该过程的指标推送。在气流收到外部触发器或类似的东西之后,但我确信没有人手动将任务标记为成功,我们没有任何进程可以改变该进程的状态。
这只随机发生了两次。
这是一种已知的行为还是有任何需要调查的提示。我虽然在增加AIRFLOW_CORE_KILLED_TASK_CLEANUP_TIME
,但由于它是随机的,而且它发生了几次,甚至不知道如何检查它是否是解决方案,以防它工作正常。
airflow - Airflow 是否适合每秒运行 DAG?
我正在使用气流版本 2.2.2 和 celery 作为执行器。这是我想要实现的目标
- Dag 应该每秒运行一次
- 一次只能运行一个活动 dag。
以前我用 celery beat 和 celery 解决了这个案例。由于将应用程序移动到基于集群的调度程序,我们正在尝试使用气流。有人可以建议我们可以在这个用例中使用气流吗
python-3.x - DAG 将在前几天立即触发,而不是 1
我正在尝试触发 dag,并从我的个人 UI 中包含一个编辑功能。这个想法是,用户将能够编辑任何现有的时间表,无论是具有相同的 dag_id 还是不同的 dag_id。因此,我已经包含了所有参数,例如时间戳,以使我的 dag_id 唯一,这样如果用户尝试编辑现有的 dag(他可能会更改/保留现有的 dag_id),dag_id 将自动更改并运行.
在上面的代码中,我将 'mode' 作为输入,它是一个字符串和可能的值 - 'Days' / 'Moths'。基于此,将决定 cron。
'time' 参数的格式为 hh:mm,即 dag 将在特定日期开始的时间,'step' 每隔 1 或 2 或 3 或....30 天重复一次。因此,如果用户从 UI 中选择“天”,我将显示在 1-31 范围内选择的选项,如果用户选择“月”,我将显示选项 1-12。
这是我的 DAG,每个编辑都有唯一的 dag_id-
task2 is - 删除所有过去的 dag,除了正在运行的一个 task1 is - some task
问题是,当 step = 1 时,使用此代码,这将在我打算开始的日期和准确的时间 hh:mm 上执行,并且还可以正确安排未来的运行。
但是当 step = 2 或 3 等时,dag 会立即被触发,而不是我通过前端 hh:mm 提供的时间。为什么这样?我相信我的任务是 timedelta(days = step),在这种情况下它不应该发生。
例如,对于 step =2,如果今天是 2021-12-06,则第一个 dag_run 立即被触发,即 2021-12-04 的运行,下一次运行显示 2021-12-05 而不是 2021-12-06 . 但是对于 step = 1 时的相同代码,这可以正常工作。有什么建议吗?
airflow - 使用taskflow api时命名Airflow dags而不是python可调用
我尝试使用任务流 API 创建多个 dag,其中传递了一个变量,dag 中的任务可以使用该变量
例如,我正在尝试使用此代码
理想情况下,这将创建两个 ID 为 dag_1 和 dag_2 的 dag。一个 dag 将打印字符串“string1”,其他 6 个。这几乎适用于创建 1 个 dag 且 ID 为 dag_template 打印 6 的代码。
文档中有 dag 将被称为 python 可调用,是否可以覆盖它。
python - 有没有办法在非 Airflow 脚本中本地运行 SnowflakeOperator、SnowflakeHook?
我有一些使用 SnowflakeOperator 和 SnowflakeHook 的 DAG。他们都使用snowflake_connection
输入连接到雪花,我保存Admin > Connections
在 Airflow 下。
虽然这种安排在 Airflow 环境中有效,但我想在我的定位机器中将代码作为常规脚本运行。但它无法访问snowflake_connection
我保存在 Airflow 中的那个。有什么解决方法吗?
有没有办法可以直接提供连接字符串或用户名/密码SnowflakeHook
并SnowflakeOperator
建立数据库连接?
airflow - 气流任务成功但返回 sigterm
我有一个任务Airflow 2.1.2
以成功状态完成,但在该日志显示一个 sigterm 之后:
如您所见,第一行返回Done
,并且此日志的前几行显示所有脚本都运行良好并且数据已插入Datawarehouse
.
在第 8 行中,它显示SIGTERM
由于一些外部触发器将其标记为成功,但我确信没有人使用API
,或者CLI
将其标记为成功也没有UI
. 知道如何避免它,为什么会发生这种情况?
我不知道是否可能增加AIRFLOW_CORE_KILLED_TASK_CLEANUP_TIME
可以解决它,但我想了解它。