问问题
998 次
1 回答
0
我设法通过破解一些东西来解决我的问题。检查下一个执行日期是否是工作日,如果是则返回 true,否则返回 false。我在 ShortCircuitOperator 中调用该函数,如果为 true,则继续执行下游任务,如果为 false,则跳过它们。
这是我下面的代码,但我愿意接受更好的解决方案。
from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
log = logging.getLogger(__name__)
def checktheday(**context):
next_execution_date = context['next_execution_date']
log.info('next_execution_date is: {}'.format(next_execution_date))
date_check = next_execution_date.weekday()
log.info('date_check is: {}'.format(date_check))
if date_check == 0 or date_check == 1 or date_check == 2 or date_check == 3 or date_check == 4:
decision = True
else:
decision = False
log.info('decision is: {}'.format(decision))
return decision
local_tz = pendulum.timezone("America/New_York")
# DAG parameters
default_args = {
'owner': 'Managed Services',
'depends_on_past': False,
'start_date': datetime(2020, 8, 3, tzinfo=local_tz),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'provide_context': True,
'retries': 12,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'execute_python',
schedule_interval='0 6 * * *',
default_args=default_args
) as dag:
start_dummy = DummyOperator(
task_id='start',
dag=dag
)
end_dummy = DummyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
weekdays_only = ShortCircuitOperator(
task_id='weekdays_only',
python_callable=checktheday,
dag=dag
)
run_python = SSHOperator(
ssh_conn_id="oci_connection",
task_id='run_python',
command='/usr/bin/python3 /home/sb/local/bin/runProcess.py -d {{ macros.ds_format(macros.ds_add(ds, 1), "%Y-%m-%d", "%Y%m%d") }}',
dag=dag)
start_dummy >> weekdays_only >> run_python >> end_dummy
于 2020-08-21T09:46:43.103 回答