1

我正在尝试跟随这个 Codelab,它向您展示如何从您的 Google App Engine 数据存储中获取数据,并通过设置 MapReduce 管道将其通过 Google Cloud Storage 移动到 BigQuery。我设置了一个 Google App Engine 数据存储实体,并有一个流程来收集关于某些股票的推文,我想收集这些数据作为测试。我相信我已经按照示例中概述的所有内容进行了操作,但是完成分解数据并将其加载到 Cloud Storage 的所有工作的分片正在引发 UnicodeEncodeErrors。这是我在开发应用服务器上测试应用程序的日志:

INFO     2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 -
WARNING  2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds
ERROR    2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
Traceback (most recent call last):
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__
rv = self.handle_exception(request, response, e)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__
rv = self.router.dispatch(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__
return handler.dispatch()
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch
return method(*args, **kwargs)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post
self.handle()
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle
entity, input_reader, ctx, tstate)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data
output_writer.write(output, ctx)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write
ctx.get_pool("file_pool").append(self._filename, str(data))
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)

这是代码:

import json
import webapp2
import urllib2
import time
import calendar
import datetime
import httplib2

from google.appengine.ext import db
from google.appengine.api import taskqueue
from google.appengine.ext import blobstore
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from google.appengine.api import urlfetch

from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials

SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'project_id' # Your Project ID here
BQ_DATASET_ID = 'datastore_data'
GS_BUCKET = 'bucketname'
ENTITY_KIND = 'main.streamdata'

class streamdata(db.Model):
    querydate = db.DateTimeProperty(auto_now_add = True)
    ticker = db.StringProperty()
    created_at = db.StringProperty()
    tweet_id = db.StringProperty()
    text = db.TextProperty()
    source = db.StringProperty()

class DatastoreMapperPipeline(base_handler.PipelineBase):

    def run(self, entity_type):

        output = yield mapreduce_pipeline.MapperPipeline(
          "Datastore Mapper %s" % entity_type,
          "main.datastore_map",
          "mapreduce.input_readers.DatastoreInputReader",
          output_writer_spec="mapreduce.output_writers.FileOutputWriter",
          params={
              "input_reader":{
                  "entity_kind": entity_type,
                  },
              "output_writer":{
                  "filesystem": "gs",
                  "gs_bucket_name": GS_BUCKET,
                  "output_sharding":"none",
                  }
              },
              shards=10)

        yield CloudStorageToBigQuery(output)

class CloudStorageToBigQuery(base_handler.PipelineBase):

    def run(self, csv_output):

        credentials = AppAssertionCredentials(scope=SCOPE)
        http = credentials.authorize(httplib2.Http())
        bigquery_service = build("bigquery", "v2", http=http)

        jobs = bigquery_service.jobs()
        table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
            '%m%d%Y_%H%M%S')
        files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
        result = jobs.insert(projectId=PROJECT_ID,
                            body=build_job_data(table_name,files))

        result.execute()

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      "fields":[
                          {
                              "name":"querydate",
                              "type":"INTEGER",
                          },
                          {
                              "name":"ticker",
                              "type":"STRING",
                          },
                          {
                              "name":"created_at",
                              "type":"STRING",
                          },
                          {
                              "name":"tweet_id",
                              "type":"STRING",
                          },
                          {   "name":"text",
                              "type":"TEXT",
                          },
                          {    
                              "name":"source",
                              "type":"STRING",
                          }
                          ]
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": BQ_DATASET_ID,
                      "tableId": table_name,
                      },
                  "maxBadRecords": 0,
                  }
              }
          }

def datastore_map(entity_type):
    data = db.to_dict(entity_type)
    resultlist = [timestamp_to_posix(data.get('querydate')),
                    data.get('ticker'),
                    data.get('created_at'),
                    data.get('tweet_id'),
                    data.get('text'),
                    data.get('source')]
    result = ','.join(['"%s"' % field for field in resultlist])
    yield("%s\n" % result)

def timestamp_to_posix(timestamp):
    return int(time.mktime(timestamp.timetuple()))

class DatastoretoBigQueryStart(webapp2.RequestHandler):
    def get(self):
        pipeline = DatastoreMapperPipeline(ENTITY_KIND)
        pipeline.start()
        path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
        self.redirect(path)

class StreamHandler(webapp2.RequestHandler):

    def get(self):

        tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC',
                   'DELL', 'C', 'JPM', 'WFM', 'WMT', 
                   'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI', 
                   'DUK', 'CEG', 'XOM', 'F', 'WFC', 
                   'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM',
                   'TLT', 'HYG', 'JNK', 'LQD', 'MSFT',
                   'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA',
                   'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP', 
                   'LNKD', 'OPEN', 'NFLX', 'SBUX', 'GMCR', 
                   'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR', 
                   'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL', 
                   'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP', 
                   'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT']

        for i in set(tickers):

            url = 'http://search.twitter.com/search.json?q='
            resultcount = '&rpp=100'
            language = '&lang=en'
            encoding = '%40%24'
            tickerstring = url + encoding + i + resultcount + language
            tickurl = urllib2.Request(tickerstring)
            tweets = urllib2.urlopen(tickurl)
            code = tweets.getcode()

            if code == 200:
                results = json.load(tweets, 'utf-8')
                if "results" in results:
                    entries = results["results"]
                    for entry in entries:
                        tweet = streamdata()
                        created = entry['created_at']
                        tweetid = entry['id_str']
                        tweettxt = entry['text']
                        tweet.ticker = i
                        tweet.created_at = created
                        tweet.tweet_id = tweetid
                        tweet.text = tweettxt
                        tweet.source = "Twitter"
                        tweet.put()

class MainHandler(webapp2.RequestHandler):

    def get(self):
        self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ')
        self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ')


app = webapp2.WSGIApplication([
                               ('/', MainHandler),
                               ('/start', DatastoretoBigQueryStart), 
                               ('/add_data', StreamHandler)], 
                              debug=True)

任何人可能有的任何见解都会有很大帮助。

非常感谢。

4

2 回答 2

3

您正在将 Unicode 数据转换为字节串:

ctx.get_pool("file_pool").append(self._filename, str(data))

当您在不指定编码的情况下执行此操作时,Python 会回退到默认值,即 ASCII。您需要改用另一种编码,一种可以处理您的数据包含的所有 Unicode 代码点的编码。

对于大多数文本,UTF-8 是一个不错的选择;如果您有很多非西方文本(阿拉伯文、亚洲文等),那么 UTF-16 可能更有效。无论哪种情况,您都必须显式编码:

ctx.get_pool("file_pool").append(self._filename, data.encode('utf8'))

从该文件读回数据时,使用filedata.decode('utf8')解码回 Unicode。

有关 Python 和 Unicode 的更多信息,请参阅Python Unicode HOWTO

于 2012-12-18T21:12:30.227 回答
0
ctx.get_pool("file_pool").append(self._filename, str(data))

如果数据包含 unicode 字符,这将失败。尝试

ctx.get_pool("file_pool").append(self._filename, unicode(data))
于 2012-12-18T21:03:25.067 回答