17

我在 XCOM 中有一个巨大的 json 文件,稍后一旦 dag 执行完成,我就不需要它了,但是我仍然在 UI 中看到带有所有数据的 Xcom 对象,有没有办法在 DAG 运行后以编程方式删除 XCOM完成了。

谢谢

4

5 回答 5

13

您可以通过 sqlalchemy 以编程方式执行清理,因此如果数据库结构发生更改,您的解决方案不会中断:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()

您还可以清除旧的 XCom 数据:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

如果你想在 dag 完成后清除 XCom,我认为最干净的解决方案是使用 DAG 模型类的“on_success_callback”属性:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)
于 2019-12-13T17:42:27.297 回答
11

您必须添加一个任务取决于您的元数据数据库(sqllite、PostgreSql、MySql..),一旦 DAG 运行完成,它会删除 XCOM。

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)

您可以在运行 dag 之前验证您的查询。

数据分析 -> 即席查询 -> 气流数据库 -> 查询 -> 运行!

xcom 元数据

于 2017-10-12T12:25:40.843 回答
3

下面是对我有用的代码,这将删除 DAG 中所有任务的 xcom(如果只需要删除特定任务的 xcom,则将 task_id 添加到 SQL):

由于dag_id是动态的,日期应该遵循各自的 SQL 语法。

from airflow.operators.postgres_operator import PostgresOperator

delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
                                            postgres_conn_id='your_conn_id',
                                            sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
                                            )
于 2019-08-28T05:43:37.120 回答
3

我对这个问题的解决方案是:

from airflow.utils.db import provide_session
from airflow.models import XCom

dag = DAG(...)

@provide_session
def cleanup_xcom(**context):     
    dag = context["dag"]
    dag_id = dag._dag_id 
    session=context["session"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

clean_xcom = PythonOperator(
    task_id="clean_xcom",
    python_callable = cleanup_xcom,
    provide_context=True, 
    dag=dag
)

clean_xcom

在 Airflow 2.1.x 中,下面的代码喜欢不工作......

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

所以改为

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
    # cleanup_xcom
    @provide_session
    def cleanup_xcom(session=None, **context):
        dag = context["dag"]
        dag_id = dag._dag_id 
        # It will delete all xcom of the dag_id
        session.query(XCom).filter(XCom.dag_id == dag_id).delete()

    clean_xcom = PythonOperator(
        task_id="clean_xcom",
        python_callable = cleanup_xcom,
        provide_context=True, 
        # dag=dag
    )
    
    start  = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end", trigger_rule="none_failed")
    
    start >> clean_xcom >> end

于 2020-11-14T13:48:42.460 回答
0

使用

from sqlalchemy import func 
[...]
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

按日期过滤(如上所述)对我不起作用。相反,我必须提供一个日期时间(包括时区):

from airflow.models import XCom
from datetime import datetime, timedelta, timezone

[...]

@provide_session
def cleanup_xcom(session=None):
    ts_limit = datetime.now(timezone.utc) - timedelta(days=2)
    session.query(XCom).filter(XCom.execution_date <= ts_limit).delete()
    logging.info(f"deleted all XCOMs older than {ts_limit}")

xcom_cleaner = python_operator.PythonOperator(
    task_id='delete-old-xcoms',
    python_callable=cleanup_xcom)

xcom_cleaner 
于 2020-10-21T06:32:17.560 回答