我正在执行一项数据监控任务,我正在使用 Great Expectation 框架来监控数据的质量。我正在使用气流+大查询+巨大的期望来实现这一点。
我已经is_blocking:False
为期望设置了参数,但是作业因异常而中止,因此下游任务无法执行。有没有办法发送通知但执行不会停止。
详细异常如下:
[2021-11-29 15:19:45,925] {taskinstance.py:1252} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=data-science
AIRFLOW_CTX_DAG_ID=abcd-data-ds-1
AIRFLOW_CTX_TASK_ID=ge-notify-_data_monitoring-expect_-5ff9677f
AIRFLOW_CTX_EXECUTION_DATE=2021-11-29T11:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-11-29T11:00:00+00:00
[2021-11-29 15:19:45,926] {great_expectations_notification_operator.py:42} INFO - Retrieving key data-ds-v4__promo_roi_input_features_monitoring_expect_column_values_to_be_between47deadf091f092857156a30495953f3c_20211129T110000
[2021-11-29 15:19:45,986] {alerts.py:109} INFO - Sending slack notification
[2021-11-29 15:19:46,411] {great_expectations_notification_operator.py:73} ERROR - Validation failed in datawarehouse for abcd.xyz.is_outlier
[2021-11-29 15:19:46,430] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1308, in _execute_task
result = task_copy.execute(context=context)
File "/opt/airflow/src/datahub/operators/expectations/great_expectations_notification_operator.py", line 79, in execute
raise AirflowException(message)
airflow.exceptions.AirflowException: Validation failed in datawarehouse for abcd.xyz.is_outlier
[2021-11-29 15:19:46,432] {taskinstance.py:1506} INFO - Marking task as FAILED. dag_id=curated-data-ds-v4, task_id=ge-notify-data_monitoring-expect_-5ff9677f, execution_date=20211129T110000, start_date=20211129T151945, end_date=20211129T151946
[2021-11-29 15:19:46,505] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-11-29 15:19:46,557] {alerts.py:109} INFO - Sending slack notification
[2021-11-29 15:19:47,564] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check