我正在尝试通过 Airflow 查询 Denodo 以列出数据库。我做了什么:
- 构建 Dockerfile 以安装 Java 和其他要求
- 创建了一个 Dag 来执行脚本
- 为 Conn Id、Conn 类型配置为 JDBC 连接、连接 URL、登录详细信息、驱动程序类、驱动程序路径。
我在 Airflow 上得到以下响应:
*** Reading local file: /opt/airflow/logs/denodo_example/select_query/2021-08-04T06:32:04.775406+00:00/1.log
[2021-08-04 06:32:05,912] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: denodo_example.select_query 2021-08-04T06:32:04.775406+00:00 [queued]>
[2021-08-04 06:32:05,921] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: denodo_example.select_query 2021-08-04T06:32:04.775406+00:00 [queued]>
[2021-08-04 06:32:05,921] {taskinstance.py:1067} INFO -
--------------------------------------------------------------------------------
[2021-08-04 06:32:05,921] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2021-08-04 06:32:05,923] {taskinstance.py:1069} INFO -
--------------------------------------------------------------------------------
[2021-08-04 06:32:05,934] {taskinstance.py:1087} INFO - Executing <Task(PythonOperator): select_query> on 2021-08-04T06:32:04.775406+00:00
[2021-08-04 06:32:05,939] {standard_task_runner.py:52} INFO - Started process 640 to run task
[2021-08-04 06:32:05,942] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'denodo_example', 'select_query', '2021-08-04T06:32:04.775406+00:00', '--job-id', '14475', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpja39uja5', '--error-file', '/tmp/tmpsv_d06ky']
[2021-08-04 06:32:05,943] {standard_task_runner.py:77} INFO - Job 14475: Subtask select_query
[2021-08-04 06:32:05,975] {logging_mixin.py:104} INFO - Running <TaskInstance: denodo_example.select_query 2021-08-04T06:32:04.775406+00:00 [running]> on host 6093b1cb6783
[2021-08-04 06:32:06,015] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=denodo_example
AIRFLOW_CTX_TASK_ID=select_query
AIRFLOW_CTX_EXECUTION_DATE=2021-08-04T06:32:04.775406+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-04T06:32:04.775406+00:00
[2021-08-04 06:32:06,246] {base.py:78} INFO - Using connection to: id: denodo_test. Host: jdbc:vdb://http://vpce-04c9e69adf77418bb-der51m2n.vpce-svc-07bf2027450b5bbe0.eu-central-1.vpce.amazonaws.com/:9999/distributed_tpcds?userAgent=jaydebeapi-ip-10-123-136-144, Port: None, Schema: , Login: user_nikitagupta, Password: ***, extra: {'extra__jdbc__drv_clsname': 'com.denodo.vdp.jdbc.Driver', 'extra__jdbc__drv_path': '/opt/airflow/denodo-vdp-jdbcdriver.jar'}
[2021-08-04 06:32:07,038] {local_task_job.py:151} INFO - Task exited with return code 1
以下是我的 dag 代码:
from airflow.providers.jdbc.hooks.jdbc import JdbcHook
from datetime import timedelta
import datetime as dt
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.jdbc.operators.jdbc import JdbcOperator
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
import jaydebeapi as dbdriver
args = {
'owner': 'airflow',
}
def func(jdbc_conn_id, sql, **kwargs):
"""Print df from JDBC """
hook = JdbcHook(jdbc_conn_id=jdbc_conn_id)
df = hook.get_pandas_df(sql=sql,autocommit=True)
print(df.to_string())
with DAG(
dag_id='denodo_example',
default_args=args,
schedule_interval='0 0 * * *',
start_date= dt.datetime(2017, 6, 1),
dagrun_timeout=timedelta(minutes=60),
tags=['example'],
) as dag:
select_query = PythonOperator(
task_id='select_query',
python_callable=func,
op_kwargs={'jdbc_conn_id': 'denodo_test', 'sql': 'select 1' },
dag=dag,)
success = DummyOperator(
task_id='success',
dag=dag,
)
select_query >> success
如何进行更改以使 dag 成功运行?