1

我是云作曲家的新手,我想在云作曲家的气流中使用 gcp_cloud_sql 挂钩执行一个 PostgreSQL SELECT 查询。我尝试使用 CloudSqlQueryOperator,但它不适用于 SELECT 查询。

我想根据从此选择查询中获得的结果创建 DAG。但是,我无法为此 SELECT 查询创建甚至简单的连接。

from six.moves.urllib.parse import quote_plus
import airflow
from airflow import models     
from airflow.contrib.operators.gcp_sql_operator import (
    CloudSqlQueryOperator
)
from datetime import date, datetime, timedelta 

GCP_PROJECT_ID = "adtech-dev"
GCP_REGION = "<my cluster zone>"
GCSQL_POSTGRES_INSTANCE_NAME_QUERY = "testpostgres"
GCSQL_POSTGRES_DATABASE_NAME = ""
GCSQL_POSTGRES_USER = "<PostgreSQL User Name>"
GCSQL_POSTGRES_PASSWORD = "**********"
GCSQL_POSTGRES_PUBLIC_IP = "0.0.0.0"
GCSQL_POSTGRES_PUBLIC_PORT = "5432"

rule_query = "select r.id from rules r where r.id = 1"

postgres_kwargs = dict(
user=quote_plus(GCSQL_POSTGRES_USER),
password=quote_plus(GCSQL_POSTGRES_PASSWORD),
public_port=GCSQL_POSTGRES_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME)
)

default_args = { 
    'owner': 'airflow',
    'start_date': datetime(2018, 5, 31),
    'email': ['aniruddha.dwivedi@xyz.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'depends_on_past': False,
    'catchup': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
}

os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=postgres&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=True&" \
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)

connection_names = [
    "proxy_postgres_tcp"
]

tasks = []

with models.DAG(
    dag_id='example_gcp_sql_query',
    default_args=default_args,
    schedule_interval=None
) as dag:
    prev_task = None

    for connection_name in connection_names:
        task = CloudSqlQueryOperator(
            gcp_cloudsql_conn_id=connection_name,
            task_id="example_gcp_sql_task_" + connection_name,
            sql=rule_query
       )
        tasks.append(task)
        if prev_task:
            prev_task >> task
        prev_task = task
4

0 回答 0