我正在尝试跟随这个 Codelab,它向您展示如何从您的 Google App Engine 数据存储中获取数据,并通过设置 MapReduce 管道将其通过 Google Cloud Storage 移动到 BigQuery。我设置了一个 Google App Engine 数据存储实体,并有一个流程来收集关于某些股票的推文,我想收集这些数据作为测试。我相信我已经按照示例中概述的所有内容进行了操作,但是完成分解数据并将其加载到 Cloud Storage 的所有工作的分片正在引发 UnicodeEncodeErrors。这是我在开发应用服务器上测试应用程序的日志:
INFO 2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 -
WARNING 2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds
ERROR 2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
Traceback (most recent call last):
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__
rv = self.handle_exception(request, response, e)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__
rv = self.router.dispatch(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__
return handler.dispatch()
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch
return method(*args, **kwargs)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post
self.handle()
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle
entity, input_reader, ctx, tstate)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data
output_writer.write(output, ctx)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write
ctx.get_pool("file_pool").append(self._filename, str(data))
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
这是代码:
import json
import webapp2
import urllib2
import time
import calendar
import datetime
import httplib2
from google.appengine.ext import db
from google.appengine.api import taskqueue
from google.appengine.ext import blobstore
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from google.appengine.api import urlfetch
from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'project_id' # Your Project ID here
BQ_DATASET_ID = 'datastore_data'
GS_BUCKET = 'bucketname'
ENTITY_KIND = 'main.streamdata'
class streamdata(db.Model):
querydate = db.DateTimeProperty(auto_now_add = True)
ticker = db.StringProperty()
created_at = db.StringProperty()
tweet_id = db.StringProperty()
text = db.TextProperty()
source = db.StringProperty()
class DatastoreMapperPipeline(base_handler.PipelineBase):
def run(self, entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
"Datastore Mapper %s" % entity_type,
"main.datastore_map",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
"input_reader":{
"entity_kind": entity_type,
},
"output_writer":{
"filesystem": "gs",
"gs_bucket_name": GS_BUCKET,
"output_sharding":"none",
}
},
shards=10)
yield CloudStorageToBigQuery(output)
class CloudStorageToBigQuery(base_handler.PipelineBase):
def run(self, csv_output):
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)
jobs = bigquery_service.jobs()
table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
'%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
result = jobs.insert(projectId=PROJECT_ID,
body=build_job_data(table_name,files))
result.execute()
def build_job_data(table_name, files):
return {"projectId": PROJECT_ID,
"configuration":{
"load": {
"sourceUris": files,
"schema":{
"fields":[
{
"name":"querydate",
"type":"INTEGER",
},
{
"name":"ticker",
"type":"STRING",
},
{
"name":"created_at",
"type":"STRING",
},
{
"name":"tweet_id",
"type":"STRING",
},
{ "name":"text",
"type":"TEXT",
},
{
"name":"source",
"type":"STRING",
}
]
},
"destinationTable":{
"projectId": PROJECT_ID,
"datasetId": BQ_DATASET_ID,
"tableId": table_name,
},
"maxBadRecords": 0,
}
}
}
def datastore_map(entity_type):
data = db.to_dict(entity_type)
resultlist = [timestamp_to_posix(data.get('querydate')),
data.get('ticker'),
data.get('created_at'),
data.get('tweet_id'),
data.get('text'),
data.get('source')]
result = ','.join(['"%s"' % field for field in resultlist])
yield("%s\n" % result)
def timestamp_to_posix(timestamp):
return int(time.mktime(timestamp.timetuple()))
class DatastoretoBigQueryStart(webapp2.RequestHandler):
def get(self):
pipeline = DatastoreMapperPipeline(ENTITY_KIND)
pipeline.start()
path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
self.redirect(path)
class StreamHandler(webapp2.RequestHandler):
def get(self):
tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC',
'DELL', 'C', 'JPM', 'WFM', 'WMT',
'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI',
'DUK', 'CEG', 'XOM', 'F', 'WFC',
'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM',
'TLT', 'HYG', 'JNK', 'LQD', 'MSFT',
'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA',
'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP',
'LNKD', 'OPEN', 'NFLX', 'SBUX', 'GMCR',
'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR',
'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL',
'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP',
'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT']
for i in set(tickers):
url = 'http://search.twitter.com/search.json?q='
resultcount = '&rpp=100'
language = '&lang=en'
encoding = '%40%24'
tickerstring = url + encoding + i + resultcount + language
tickurl = urllib2.Request(tickerstring)
tweets = urllib2.urlopen(tickurl)
code = tweets.getcode()
if code == 200:
results = json.load(tweets, 'utf-8')
if "results" in results:
entries = results["results"]
for entry in entries:
tweet = streamdata()
created = entry['created_at']
tweetid = entry['id_str']
tweettxt = entry['text']
tweet.ticker = i
tweet.created_at = created
tweet.tweet_id = tweetid
tweet.text = tweettxt
tweet.source = "Twitter"
tweet.put()
class MainHandler(webapp2.RequestHandler):
def get(self):
self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ')
self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ')
app = webapp2.WSGIApplication([
('/', MainHandler),
('/start', DatastoretoBigQueryStart),
('/add_data', StreamHandler)],
debug=True)
任何人可能有的任何见解都会有很大帮助。
非常感谢。