10

我们有哪些方法可以从新推出的 Google Cloud Composer 连接到 Google Cloud SQL (MySQL) 实例?目的是将数据从 Cloud SQL 实例获取到 BigQuery(可能通过 Cloud Storage 进行中间步骤)。

  1. Cloud SQL 代理能否以某种方式暴露在 Kubernetes 集群托管 Composer 的 pod 上?

  2. 如果不能,可以使用 Kubernetes Service Broker 引入 Cloud SQL Proxy 吗?-> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker

  3. 是否应该使用 Airflow 来调度和调用 GCP API 命令,例如 1) 将 mysql 表导出到云存储 2) 将 mysql 导出读入 bigquery?

  4. 也许我还缺少其他方法来完成这项工作

4

6 回答 6

7

“Cloud SQL 代理提供对 Cloud SQL 第二代实例的安全访问,而无需将 IP 地址列入白名单或配置 SSL。” - Google CloudSQL-代理文档

CloudSQL 代理似乎是最推荐的连接到 CloudSQL 的方式。所以在 Composer 中,从 1.6.1 版本开始,我们可以创建一个新的 Kubernetes Pod 来运行 gcr.io/cloudsql-docker/gce-proxy:latest 镜像,通过服务公开它,然后在 Composer 中创建一个 Connection 来使用在运算符中。

要设置:

  • 遵循谷歌的文档

  • 使用来自Arik 的 Medium Post 的信息测试连接

    • 检查 pod 是否已创建kubectl get pods --all-namespaces

    • 检查服务是否已创建kubectl get services --all-namespaces

    • 跳转到工作节点kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash

      • 测试mysql连接mysql -u composer -p --host <service-name>.default.svc.cluster.local

笔记:

  • Composer 现在使用命名空间来组织 pod

  • 不同命名空间中的 Pod不会相互通信,除非你给它们完整的路径<k8-service-name>.<k8-namespace-name>.svc.cluster.local

  • 使用完整路径创建新的Composer 连接将启用成功连接

于 2019-05-20T23:13:41.667 回答
4

我们遇到了同样的问题,但使用的是 Postgres 实例。这就是我们所做的,并让它发挥作用:

  • 在运行气流的 Kubernetes 集群中创建一个 sqlproxy 部署。这是默认airflow_db连接使用的现有airflow-sqlproxy的副本,对部署文件进行了以下更改:

    • 用新的代理名称替换airflow-sqlproxy的所有实例
    • 在'spec:template:spec:containers:command:-instances'下编辑,用我们要连接的新实例替换现有实例名称
  • 创建一个 kubernetes 服务,再次作为现有气流-sqlproxy-service 的副本,并进行以下更改:

    • 用新的代理名称替换airflow-sqlproxy的所有实例
    • 在“规范:端口”下,更改为适当的端口(我们将 5432 用于 Postgres 实例)
  • 在气流 UI 中,添加一个 Postgres 类型的连接,并将主机设置为新创建的服务名称。

于 2019-02-06T13:12:11.367 回答
2

您可以按照这些说明在集群中启动新的 Cloud SQL 代理实例。

回复#3:这听起来是个不错的计划。据我所知,BigQuery 运算符没有 Cloud SQL,因此您必须像您描述的那样分两个阶段进行操作。

于 2018-05-17T21:10:41.597 回答
1

将来自@Leo 的评论中的中等帖子添加到顶级https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53。一旦您阅读了该文章并进行了服务设置,您就可以使用 SQLAlchemy 从您的 DAG 进行连接,如下所示:

import os
from datetime import datetime, timedelta
import logging

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"

default_args = {
    'start_date': datetime(2019, 7, 16)
}


def connect_to_cloud_sql():
    '''
        Create a connection to CloudSQL
    :return:
    '''
    import sqlalchemy
    try:
        PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
        logger.info("DB URL", PROXY_DB_URL)
        engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
        for result in engine.execute("SELECT NOW() as now"):
            logger.info(dict(result))
    except Exception:
        logger.exception("Unable to interact with CloudSQL")


dag = DAG(
    dag_id="example_sqlalchemy",
    default_args=default_args,
    # schedule_interval=timedelta(minutes=5),
    catchup=False  # If you don't set this then the dag will run according to start date
)


t1 = PythonOperator(
    task_id="example_sqlalchemy",
    python_callable=connect_to_cloud_sql,
    dag=dag
)


if __name__ == "__main__":
    connect_to_cloud_sql()
于 2019-07-19T12:39:44.243 回答
0

现在我们无需自己创建云代​​理即可连接到 Cloud SQL。操作员将自动创建它。代码如下所示:

from airflow.models import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator

export_body = {
    'exportContext': {
        'fileType': 'CSV',
        'uri': EXPORT_URI,
        'databases': [DB_NAME],
        'csvExportOptions': {
            'selectQuery': SQL
        }
    }
}

default_dag_args = {}

with DAG(
        'postgres_test',
        schedule_interval='@once',
        default_args=default_dag_args) as dag:

    sql_export_task = CloudSqlInstanceExportOperator(
        project_id=GCP_PROJECT_ID,
        body=export_body,
        instance=INSTANCE_NAME,
        task_id='sql_export_task'
    )
于 2019-10-24T04:48:21.783 回答
0

在这里,在 Hoffa 对类似问题的回答中,您可以找到有关 Wepay 如何使用 Airflow 运营商保持每 15 分钟同步一次的参考资料。

从所说的答案:

看看WePay是如何做到这一点的:

MySQL 到 GCS 运算符对 MySQL 表执行 SELECT 查询。SELECT 提取所有大于(或等于)最后一个高水位线的数据。高水位线要么是表的主键(如果表是仅追加的),要么是修改时间戳列(如果表接收到更新)。同样,SELECT 语句也可以回溯一点时间(或行),以捕获上一个查询中可能删除的行(由于上述问题)。

借助 Airflow,他们设法使 BigQuery 每 15 分钟与他们的 MySQL 数据库同步一次。

于 2018-06-18T07:29:14.207 回答