2

我正在尝试在 Google Cloud Composer 中运行 DAG,其中第一个组件是使用 http GET 请求调用 API,然后使用 python-client 库将 json 插入 BigQuery 表中。我正在尝试运行此功能:https ://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html

import requests
import datetime
import ast
import numpy as np
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
import google.cloud.bigquery as bigquery

client = bigquery.Client(project = 'is-flagship-data-api-sand')
dataset_id = 'Mobile_Data_Test'
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table('sample_wed')
table = client.get_table(table_ref)

def get_localytics_data():
    profiles_requests_command = "https://%s:%s@api.localytics.com/v1/exports/profiles/%d/profile"%(api_key,api_secret,28761)
    res_profiles = requests.get(profiles_requests_command)
    if res_profiles.status_code == 200:
        data = res_profiles.content
        data_split = data.split('\n')[:-1]
        data_split_ast = [ast.literal_eval(x) for x in data_split]

        #take out characters from the beginning to have neat columns
        data_split_ast_pretty = [dict(zip(map(lambda x: x[4:], item.keys()), item.values())) for item in data_split_ast]


        #add current date
        current_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        for item in data_split_ast_pretty:
            item['DateCreated'] = current_time


        random_sample = list(np.random.choice(data_split_ast_pretty,5))  
        print random_sample
        client.insert_rows_json(table = table, json_rows = random_sample)
    else:
        pass




run_api = python_operator.PythonOperator(task_id='call_api',
        python_callable=get_localytics_data)

我添加了 PYPI 包:

请求 ===2.19.1

numpy ===1.12.0

谷歌云大查询 ===1.4.0

我收到以下错误: Broken DAG: [/home/airflow/gcs/dags/composer_test_july30_v2.py] 'Client' 对象 在 Airflow UI 控制台中没有属性 'get_table' 。

显示的所有代码都可以在本地工作,但无法使用 Cloud Composer。

4

2 回答 2

0

听起来你有一个过时的google-cloud-bigquery包,虽然它看起来不应该。

为了确定,它需要通过 SSH 连接到 Composer 环境的 Google Kubernetes Engine (GKE) 集群并运行pip freeze | grep bigquery以找出安装的实际版本。

  1. 转到https://console.cloud.google.com/kubernetes/list
  2. 找到对应的GKE集群,点击。
  3. 点击顶部的连接。
  4. 一旦进入控制台类型kubectl get pods。应该会出现一个 pod 列表。
  5. 输入kubectl exec -it <AIRFLOW_WORKER> /bin/bash以airflow-worker-* 开头的pod 的位置。
  6. 进入 pod 后,输入pip freeze | grep bigquery,它应该会显示模块的版本。
于 2018-07-31T18:25:32.120 回答
0

Google 现在在 Composer 图像页面上发布 Python 包/版本https://cloud.google.com/composer/docs/concepts/versioning/composer-versions只需找到您正在使用的版本的行,然后展开 Packages 列.

这是迄今为止获取软件包版本的最简单方法,因为pip freezeAirflow 工作人员现在会生成一个引用轮子文件位置而不是版本号的列表。例如:

airflow@airflow-worker-*****:~$ pip freeze
absl-py @ file:///usr/local/lib/airflow-pypi-dependencies-2.1.4/python3.8/absl_py-1.0.0-py3-none-any.whl
alembic @ file:///usr/local/lib/airflow-pypi-dependencies-2.1.4/python3.8/alembic-1.7.1-py3-none-any.whl
amqp @ file:///usr/local/lib/airflow-pypi-dependencies-2.1.4/python3.8/amqp-2.6.1-py2.py3-none-any.whl
anyio @ file:///usr/local/lib/airflow-pypi-dependencies-2.1.4/python3.8/anyio-3.3.1-py3-none-any.whl
etc....
于 2022-01-13T03:38:38.180 回答