拥有一个 GAE 数据存储类型,其中包含数十万个对象。想做几个涉及到的查询(涉及计数查询)。Big Query 似乎很适合这样做。
目前是否有一种使用 Big Query 查询实时 AppEngine Datastore 的简单方法?
拥有一个 GAE 数据存储类型,其中包含数十万个对象。想做几个涉及到的查询(涉及计数查询)。Big Query 似乎很适合这样做。
目前是否有一种使用 Big Query 查询实时 AppEngine Datastore 的简单方法?
您不能直接在 DataStore 实体上运行 BigQuery,但您可以编写一个 Mapper Pipeline,从 DataStore 中读取实体,将它们写入 Google Cloud Storage 中的 CSV,然后将它们摄取到 BigQuery - 您甚至可以自动化该过程。以下是仅将Mapper API类用于 DataStore 到 CSV 步骤 的示例:
import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials
#Number of shards to use in the Mapper pipeline
SHARDS = 20
# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'
# DataStore Model
class YourEntity(db.Expando):
field1 = db.StringProperty() # etc, etc
ENTITY_KIND = 'main.YourEntity'
class MapReduceStart(webapp.RequestHandler):
"""Handler that provides link for user to start MapReduce pipeline.
"""
def get(self):
pipeline = IteratorPipeline(ENTITY_KIND)
pipeline.start()
path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
logging.info('Redirecting to: %s' % path)
self.redirect(path)
class IteratorPipeline(base_handler.PipelineBase):
""" A pipeline that iterates through datastore
"""
def run(self, entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
"DataStore_to_Google_Storage_Pipeline",
"main.datastore_map",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
"input_reader":{
"entity_kind": entity_type,
},
"output_writer":{
"filesystem": "gs",
"gs_bucket_name": GS_BUCKET,
"output_sharding":"none",
}
},
shards=SHARDS)
def datastore_map(entity_type):
props = GetPropsFor(entity_type)
data = db.to_dict(entity_type)
result = ','.join(['"%s"' % str(data.get(k)) for k in props])
yield('%s\n' % result)
def GetPropsFor(entity_or_kind):
if (isinstance(entity_or_kind, basestring)):
kind = entity_or_kind
else:
kind = entity_or_kind.kind()
cls = globals().get(kind)
return cls.properties()
application = webapp.WSGIApplication(
[('/start', MapReduceStart)],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
如果将此附加到 IteratorPipeline 类的末尾:yield CloudStorageToBigQuery(output)
,则可以将生成的 csv 文件句柄通过管道传输到 BigQuery 摄取管道中……如下所示:
class CloudStorageToBigQuery(base_handler.PipelineBase):
"""A Pipeline that kicks off a BigQuery ingestion job.
"""
def run(self, output):
# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'
# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)
jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
'%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
body=build_job_data(table_name,files)).execute()
logging.info(result)
def build_job_data(table_name, files):
return {"projectId": PROJECT_ID,
"configuration":{
"load": {
"sourceUris": files,
"schema":{
# put your schema here
"fields": fields
},
"destinationTable":{
"projectId": PROJECT_ID,
"datasetId": DATASET_ID,
"tableId": table_name,
},
}
}
}
使用新的(从 2013 年 9 月起)流插入 API,您可以将记录从您的应用程序导入 BigQuery。
数据会立即在 BigQuery 中可用,因此应该可以满足您的实时需求。
虽然这个问题现在有点老了,但对于任何遇到这个问题的人来说,这可能是一个更简单的解决方案
目前,虽然从本地开发服务器上让它工作充其量是不完整的。
我们正在做一个 Trusted Tester 程序,通过两个简单的操作从 Datastore 迁移到 BigQuery:
它会自动为您处理架构。
对于 BigQuery,您必须将这些 Kind 导出到 CSV 或分隔记录结构中,加载到 BigQuery 中,然后您就可以查询了。据我所知,没有任何设施可以查询实时 GAE 数据存储。
Biquery 是分析查询引擎,这意味着您无法更改记录。不允许更新或删除,只能追加。
不,BigQuery 是一种不同的产品,需要将数据上传到其中。它不能在数据存储上工作。您可以使用 GQL 查询数据存储。
截至 2016 年,现在这很有可能!您必须执行以下操作:
有关此工作流程的完整示例,请参阅这篇文章!