我正在尝试跟随这个 Codelab,它向您展示如何从您的 Google App Engine 数据存储中获取数据,并通过设置 MapReduce 管道将其通过 Google Cloud Storage 移动到 BigQuery。我设置了一个 Google App Engine 数据存储实体,并有一个流程来收集关于某些股票的推文,我想收集这些数据作为测试。我相信我已经按照示例中概述的所有内容进行了操作,但是完成分解数据并将其加载到 Cloud Storage 的所有工作的分片正在引发 UnicodeEncodeErrors。这是我在开发应用服务器上测试应用程序的日志:
INFO 2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 -
WARNING 2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds
ERROR 2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
Traceback (most recent call last):
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__
rv = self.handle_exception(request, response, e)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__
rv = self.router.dispatch(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__
return handler.dispatch()
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch
return method(*args, **kwargs)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle
entity, input_reader, ctx, tstate)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data
output_writer.write(output, ctx)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write
ctx.get_pool("file_pool").append(self._filename, str(data))
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
import json
import webapp2
import urllib2
import time
import calendar
import datetime
import httplib2
from google.appengine.ext import db
from google.appengine.api import taskqueue
from google.appengine.ext import blobstore
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from google.appengine.api import urlfetch
from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'project_id' # Your Project ID here
BQ_DATASET_ID = 'datastore_data'
GS_BUCKET = 'bucketname'
ENTITY_KIND = 'main.streamdata'
class streamdata(db.Model):
querydate = db.DateTimeProperty(auto_now_add = True)
ticker = db.StringProperty()
created_at = db.StringProperty()
tweet_id = db.StringProperty()
text = db.TextProperty()
source = db.StringProperty()
class DatastoreMapperPipeline(base_handler.PipelineBase):
def run(self, entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
"Datastore Mapper %s" % entity_type,
"entity_kind": entity_type,
"filesystem": "gs",
"gs_bucket_name": GS_BUCKET,
yield CloudStorageToBigQuery(output)
class CloudStorageToBigQuery(base_handler.PipelineBase):
def run(self, csv_output):
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)
jobs = bigquery_service.jobs()
table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
result = jobs.insert(projectId=PROJECT_ID,
def build_job_data(table_name, files):
return {"projectId": PROJECT_ID,
"load": {
"sourceUris": files,
{ "name":"text",
"projectId": PROJECT_ID,
"datasetId": BQ_DATASET_ID,
"tableId": table_name,
"maxBadRecords": 0,
def datastore_map(entity_type):
data = db.to_dict(entity_type)
resultlist = [timestamp_to_posix(data.get('querydate')),
result = ','.join(['"%s"' % field for field in resultlist])
yield("%s\n" % result)
def timestamp_to_posix(timestamp):
return int(time.mktime(timestamp.timetuple()))
class DatastoretoBigQueryStart(webapp2.RequestHandler):
def get(self):
pipeline = DatastoreMapperPipeline(ENTITY_KIND)
path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
class StreamHandler(webapp2.RequestHandler):
def get(self):
tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC',
'DELL', 'C', 'JPM', 'WFM', 'WMT',
'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI',
'DUK', 'CEG', 'XOM', 'F', 'WFC',
'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM',
'TLT', 'HYG', 'JNK', 'LQD', 'MSFT',
'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA',
'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP',
'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR',
'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL',
'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP',
'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT']
for i in set(tickers):
url = 'http://search.twitter.com/search.json?q='
resultcount = '&rpp=100'
language = '&lang=en'
encoding = '%40%24'
tickerstring = url + encoding + i + resultcount + language
tickurl = urllib2.Request(tickerstring)
tweets = urllib2.urlopen(tickurl)
code = tweets.getcode()
if code == 200:
results = json.load(tweets, 'utf-8')
if "results" in results:
entries = results["results"]
for entry in entries:
tweet = streamdata()
created = entry['created_at']
tweetid = entry['id_str']
tweettxt = entry['text']
tweet.ticker = i
tweet.created_at = created
tweet.tweet_id = tweetid
tweet.text = tweettxt
tweet.source = "Twitter"
class MainHandler(webapp2.RequestHandler):
def get(self):
self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ')
self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ')
app = webapp2.WSGIApplication([
('/', MainHandler),
('/start', DatastoretoBigQueryStart),
('/add_data', StreamHandler)],