我正在尝试在 Google Cloud Composer 中运行 DAG,其中第一个组件是使用 http GET 请求调用 API,然后使用 python-client 库将 json 插入 BigQuery 表中。我正在尝试运行此功能:https ://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html
import requests
import datetime
import ast
import numpy as np
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
import google.cloud.bigquery as bigquery
client = bigquery.Client(project = 'is-flagship-data-api-sand')
dataset_id = 'Mobile_Data_Test'
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table('sample_wed')
table = client.get_table(table_ref)
def get_localytics_data():
profiles_requests_command = "https://%s:%s@api.localytics.com/v1/exports/profiles/%d/profile"%(api_key,api_secret,28761)
res_profiles = requests.get(profiles_requests_command)
if res_profiles.status_code == 200:
data = res_profiles.content
data_split = data.split('\n')[:-1]
data_split_ast = [ast.literal_eval(x) for x in data_split]
#take out characters from the beginning to have neat columns
data_split_ast_pretty = [dict(zip(map(lambda x: x[4:], item.keys()), item.values())) for item in data_split_ast]
#add current date
current_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
for item in data_split_ast_pretty:
item['DateCreated'] = current_time
random_sample = list(np.random.choice(data_split_ast_pretty,5))
print random_sample
client.insert_rows_json(table = table, json_rows = random_sample)
else:
pass
run_api = python_operator.PythonOperator(task_id='call_api',
python_callable=get_localytics_data)
我添加了 PYPI 包:
请求 ===2.19.1
numpy ===1.12.0
谷歌云大查询 ===1.4.0
我收到以下错误: Broken DAG: [/home/airflow/gcs/dags/composer_test_july30_v2.py] 'Client' 对象 在 Airflow UI 控制台中没有属性 'get_table' 。
显示的所有代码都可以在本地工作,但无法使用 Cloud Composer。