我们通过 Python API使用Google BigQuery 。如何从查询结果创建表(新表或覆盖旧表)?我查看了查询文档,但没有发现它有用。
我们要模拟:
来自 ANSI SQL 的“SELEC ... INTO ...”。
我们通过 Python API使用Google BigQuery 。如何从查询结果创建表(新表或覆盖旧表)?我查看了查询文档,但没有发现它有用。
我们要模拟:
来自 ANSI SQL 的“SELEC ... INTO ...”。
您可以通过在查询中指定目标表来执行此操作。您将需要使用Jobs.insert
API 而不是Jobs.query
调用,并且您应该指定writeDisposition=WRITE_APPEND
并填写目标表。
如果您使用的是原始 API,下面是配置的样子。如果您使用的是 Python,Python 客户端应该为这些相同的字段提供访问器:
"configuration": {
"query": {
"query": "select count(*) from foo.bar",
"destinationTable": {
"projectId": "my_project",
"datasetId": "my_dataset",
"tableId": "my_table"
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_APPEND",
}
}
接受的答案是正确的,但它没有提供 Python 代码来执行任务。这是一个示例,重构自我刚刚编写的一个小型自定义客户端类。它不处理异常,并且应该定制硬编码查询来做一些更有趣的事情,而不仅仅是SELECT *
......
import time
from google.cloud import bigquery
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.dataset import Dataset
class Client(object):
def __init__(self, origin_project, origin_dataset, origin_table,
destination_dataset, destination_table):
"""
A Client that performs a hardcoded SELECT and INSERTS the results in a
user-specified location.
All init args are strings. Note that the destination project is the
default project from your Google Cloud configuration.
"""
self.project = origin_project
self.dataset = origin_dataset
self.table = origin_table
self.dest_dataset = destination_dataset
self.dest_table_name = destination_table
self.client = bigquery.Client()
def run(self):
query = ("SELECT * FROM `{project}.{dataset}.{table}`;".format(
project=self.project, dataset=self.dataset, table=self.table))
job_config = bigquery.QueryJobConfig()
# Set configuration.query.destinationTable
destination_dataset = self.client.dataset(self.dest_dataset)
destination_table = destination_dataset.table(self.dest_table_name)
job_config.destination = destination_table
# Set configuration.query.createDisposition
job_config.create_disposition = 'CREATE_IF_NEEDED'
# Set configuration.query.writeDisposition
job_config.write_disposition = 'WRITE_APPEND'
# Start the query
job = self.client.query(query, job_config=job_config)
# Wait for the query to finish
job.result()
根据 Google BigQuery 中的查询结果创建表。假设您正在使用带有 Python 3 的 Jupyter Notebook,将解释以下步骤:
在 BQ 上创建一个新的 DataSet:my_dataset
bigquery_client = bigquery.Client() #Create a BigQuery service object
dataset_id = 'my_dataset'
dataset_ref = bigquery_client.dataset(dataset_id) # Create a DatasetReference using a chosen dataset ID.
dataset = bigquery.Dataset(dataset_ref) # Construct a full Dataset object to send to the API.
dataset.location = 'US' # Specify the geographic location where the new dataset will reside. Remember this should be same location as that of source data set from where we are getting data to run a query
# Send the dataset to the API for creation. Raises google.api_core.exceptions.AlreadyExists if the Dataset already exists within the project.
dataset = bigquery_client.create_dataset(dataset) # API request
print('Dataset {} created.'.format(dataset.dataset_id))
使用 Python 在 BQ 上运行查询:
这里有 2 种类型:
我在这里使用公共数据集:bigquery-public-data:hacker_news & Table id: comments 来运行查询。
DestinationTableName='table_id1' #Enter new table name you want to give
!bq query --allow_large_results --destination_table=project_id:my_dataset.$DestinationTableName 'SELECT * FROM [bigquery-public-data:hacker_news.comments]'
如果需要,此查询将允许较大的查询结果。
DestinationTableName='table_id2' #Enter new table name you want to give
!bq query destination_table=project_id:my_dataset.$DestinationTableName 'SELECT * FROM [bigquery-public-data:hacker_news.comments] LIMIT 100'
这将适用于结果不会超过 Google BQ 文档中提到的限制的查询。