2

我正在开展一个项目,该项目需要根据文本中的单词共现来构建一个庞大的知识库。正如我所研究的,在 appengine 中还没有尝试过类似的方法。我想利用 appengine 的灵活性和可扩展性,以便能够为知识库提供服务并对其进行推理以供广大用户使用。

到目前为止,我已经提出了一个基于管道演示应用程序的 mapreduce 实现。源文本作为包含一个 xml 文档的压缩文件存储在 blobstore 中,每个文档包含可变数量的文章(多达 30000 篇)。

第一步是调整当前的BlobstoreZipLineInputReader,以便解析 xml 文件,从中检索相关信息。XMLParser 类使用 lxml iterparse 方法从http://www.ibm.com/developerworks/xml/library/x-hiperfparse/检索要处理的 xml 元素,并返回一个迭代器。

修改后的类BlobstoreXMLZipLineInputReader有一个稍微不同的next功能:

def next(self):
  if not self._filestream:
    if not self._zip:
      self._zip = zipfile.ZipFile(self._reader(self._blob_key))
      self._entries = self._zip.infolist()[self._start_file_index:
                                           self._end_file_index]
      self._entries.reverse()
    if not self._entries:
      raise StopIteration()
    entry = self._entries.pop()

    parser = XMLParser()
    # the result here is an iterator with the individual articles
    self._filestream = parser.parseXML(self._zip.open(entry.filename))

  try:
      article = self._filestream.next()
      self._article_index += 1    
  except StopIteration:
      article = None

  if not article:
    self._filestream.close()
    self._filestream = None
    self._start_file_index += 1
    self._initial_offset = 0
    return self.next()

  return ((self._blob_key, self._start_file_index, self._article_index),
          article)

然后 map 函数将接收这些文章中的每一篇,按句子拆分,然后按单词拆分:

def map_function(data):
  """Word count map function."""
  (entry, article) = data
  for s in split_into_sentences(article.body):
    for w in split_into_words(s.lower()):
      if w not in STOPWORDS:
        yield (w, article.id)

reducer 聚合单词,并加入它们出现的文章的 id:

def reduce_function(key, values):
  """Word count reduce function."""
  yield "%s: %s\n" % (key, list(set(values)))

这在开发服务器和多达 10000 条文本的实时设置上都非常有效(上面没有那么多字)。一般不超过10秒。问题是当它有点过时,mapreduce 似乎会一直挂起处理作业。每个分片处理的项目数量只会增加,我的写入操作限制很快就会达到。

Q1。mapreduce 管道在开始“表现不佳”之前可以执行多少个映射操作是否存在某种限制?
Q2。会有更好的方法来解决我的问题吗?
Q3。我知道以前有人问过这个问题,但是我可以绕过临时 mapreduce 数据存储写入吗?他们在杀我...

PS:这是我的主要 mapreduce 调用:

class XMLArticlePipeline(base_handler.PipelineBase):
  def run(self, filekey, blobkey):
    output = yield mapreduce_pipeline.MapreducePipeline(
        "process_xml",
        "backend.build_knowledgebase.map_function",
        "backend.build_knowledgebase.reduce_function",
        "backend.build_knowledgebase.BlobstoreXMLZipLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_keys": [blobkey],
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=12)
    yield StoreOutput(filekey, output)

编辑:运行无休止的工作时,我在开发服务器中遇到一些奇怪的错误:

[App Instance] [0] [dev_appserver_multiprocess.py:821] INFO Exception in HandleRequestThread
Traceback (most recent call last):
  File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 819, in run
    HandleRequestDirectly(request, client_address)
  File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 957, in HandleRequestDirectly
    HttpServer(), request, client_address)
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 310, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 323, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver.py", line 2579, in __init__
    BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 641, in __init__
    self.finish()
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 694, in finish
    self.wfile.flush()
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/socket.py", line 303, in flush
    self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 32] Broken pipe
4

0 回答 0