我在 XCOM 中有一个巨大的 json 文件,稍后一旦 dag 执行完成,我就不需要它了,但是我仍然在 UI 中看到带有所有数据的 Xcom 对象,有没有办法在 DAG 运行后以编程方式删除 XCOM完成了。
谢谢
我在 XCOM 中有一个巨大的 json 文件,稍后一旦 dag 执行完成,我就不需要它了,但是我仍然在 UI 中看到带有所有数据的 Xcom 对象,有没有办法在 DAG 运行后以编程方式删除 XCOM完成了。
谢谢
您可以通过 sqlalchemy 以编程方式执行清理,因此如果数据库结构发生更改,您的解决方案不会中断:
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.dag_id == "your dag id").delete()
您还可以清除旧的 XCom 数据:
from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()
如果你想在 dag 完成后清除 XCom,我认为最干净的解决方案是使用 DAG 模型类的“on_success_callback”属性:
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(context, session=None):
dag_id = context["ti"]["dag_id"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
dag = DAG( ...
on_success_callback=cleanup_xcom,
)
您必须添加一个任务取决于您的元数据数据库(sqllite、PostgreSql、MySql..),一旦 DAG 运行完成,它会删除 XCOM。
delete_xcom_task = PostgresOperator(
task_id='delete-xcom-task',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id=dag.dag_id and
task_id='your_task_id' and execution_date={{ ds }}",
dag=dag)
您可以在运行 dag 之前验证您的查询。
数据分析 -> 即席查询 -> 气流数据库 -> 查询 -> 运行!
下面是对我有用的代码,这将删除 DAG 中所有任务的 xcom(如果只需要删除特定任务的 xcom,则将 task_id 添加到 SQL):
由于dag_id是动态的,日期应该遵循各自的 SQL 语法。
from airflow.operators.postgres_operator import PostgresOperator
delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
postgres_conn_id='your_conn_id',
sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
)
我对这个问题的解决方案是:
from airflow.utils.db import provide_session
from airflow.models import XCom
dag = DAG(...)
@provide_session
def cleanup_xcom(**context):
dag = context["dag"]
dag_id = dag._dag_id
session=context["session"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
clean_xcom = PythonOperator(
task_id="clean_xcom",
python_callable = cleanup_xcom,
provide_context=True,
dag=dag
)
clean_xcom
在 Airflow 2.1.x 中,下面的代码喜欢不工作......
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(context, session=None):
dag_id = context["ti"]["dag_id"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
dag = DAG( ...
on_success_callback=cleanup_xcom,
)
所以改为
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
# cleanup_xcom
@provide_session
def cleanup_xcom(session=None, **context):
dag = context["dag"]
dag_id = dag._dag_id
# It will delete all xcom of the dag_id
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
clean_xcom = PythonOperator(
task_id="clean_xcom",
python_callable = cleanup_xcom,
provide_context=True,
# dag=dag
)
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end", trigger_rule="none_failed")
start >> clean_xcom >> end
使用
from sqlalchemy import func
[...]
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()
按日期过滤(如上所述)对我不起作用。相反,我必须提供一个日期时间(包括时区):
from airflow.models import XCom
from datetime import datetime, timedelta, timezone
[...]
@provide_session
def cleanup_xcom(session=None):
ts_limit = datetime.now(timezone.utc) - timedelta(days=2)
session.query(XCom).filter(XCom.execution_date <= ts_limit).delete()
logging.info(f"deleted all XCOMs older than {ts_limit}")
xcom_cleaner = python_operator.PythonOperator(
task_id='delete-old-xcoms',
python_callable=cleanup_xcom)
xcom_cleaner