我想将 Airflow 与 presto 集成。在 bash shell 上,命令 - */opt/presto/bin/presto --server 10.0.0.15:8190 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"* 工作正常,但在python 气流脚本,它抛出错误无效的 sythax 并突出显示命令。请问什么是最好的方法来做到这一点。下面是气流脚本。
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta, date
default_args = {
'owner': 'daasuser',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['a.olabamidele@ligadata.com'],
'email_on_failure': True,
#'email_on_retry': True,
'retries': 5,
'retry_delay': timedelta(minutes = 10),
#'queue': 'bash_queue',
#'pool': 'backfill',
#'priority_weight': 10,
#'end_date': datetime(2016, 1, 1),
#'wait_for_downstream': False,
#'dag': dag,
#'sla': timedelta(hours = 2),
#'execution_timeout': timedelta(seconds = 300),
#'on_failure_callback': some_function,
#'on_success_callback': some_other_function,
#'on_retry_callback': another_function,
}
dag = DAG(
'cvm_weekly_datamart',
default_args=default_args,
description='To insert records into cvm datamart weekly',
schedule_interval='0 5 * * 0')
#date1='/opt/presto/bin/presto --server 10.0.0.15:8190 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"
t1 = BashOperator(
task_id='print_date',
bash_command=/opt/presto/bin/presto --server 54.242.0.153:8180 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10",
dag=dag)
print1='echo "Completed"'
t2 = BashOperator(
task_id='print',
bash_command=print1,
#retries=1,
dag=dag)
t1>>t2