我正在开展一个项目,该项目需要根据文本中的单词共现来构建一个庞大的知识库。正如我所研究的,在 appengine 中还没有尝试过类似的方法。我想利用 appengine 的灵活性和可扩展性,以便能够为知识库提供服务并对其进行推理以供广大用户使用。
到目前为止,我已经提出了一个基于管道演示应用程序的 mapreduce 实现。源文本作为包含一个 xml 文档的压缩文件存储在 blobstore 中,每个文档包含可变数量的文章(多达 30000 篇)。
第一步是调整当前的BlobstoreZipLineInputReader
,以便解析 xml 文件,从中检索相关信息。XMLParser 类使用 lxml iterparse 方法从http://www.ibm.com/developerworks/xml/library/x-hiperfparse/检索要处理的 xml 元素,并返回一个迭代器。
修改后的类BlobstoreXMLZipLineInputReader
有一个稍微不同的next
功能:
def next(self):
if not self._filestream:
if not self._zip:
self._zip = zipfile.ZipFile(self._reader(self._blob_key))
self._entries = self._zip.infolist()[self._start_file_index:
self._end_file_index]
self._entries.reverse()
if not self._entries:
raise StopIteration()
entry = self._entries.pop()
parser = XMLParser()
# the result here is an iterator with the individual articles
self._filestream = parser.parseXML(self._zip.open(entry.filename))
try:
article = self._filestream.next()
self._article_index += 1
except StopIteration:
article = None
if not article:
self._filestream.close()
self._filestream = None
self._start_file_index += 1
self._initial_offset = 0
return self.next()
return ((self._blob_key, self._start_file_index, self._article_index),
article)
然后 map 函数将接收这些文章中的每一篇,按句子拆分,然后按单词拆分:
def map_function(data):
"""Word count map function."""
(entry, article) = data
for s in split_into_sentences(article.body):
for w in split_into_words(s.lower()):
if w not in STOPWORDS:
yield (w, article.id)
reducer 聚合单词,并加入它们出现的文章的 id:
def reduce_function(key, values):
"""Word count reduce function."""
yield "%s: %s\n" % (key, list(set(values)))
这在开发服务器和多达 10000 条文本的实时设置上都非常有效(上面没有那么多字)。一般不超过10秒。问题是当它有点过时,mapreduce 似乎会一直挂起处理作业。每个分片处理的项目数量只会增加,我的写入操作限制很快就会达到。
Q1。mapreduce 管道在开始“表现不佳”之前可以执行多少个映射操作是否存在某种限制?
Q2。会有更好的方法来解决我的问题吗?
Q3。我知道以前有人问过这个问题,但是我可以绕过临时 mapreduce 数据存储写入吗?他们在杀我...
PS:这是我的主要 mapreduce 调用:
class XMLArticlePipeline(base_handler.PipelineBase):
def run(self, filekey, blobkey):
output = yield mapreduce_pipeline.MapreducePipeline(
"process_xml",
"backend.build_knowledgebase.map_function",
"backend.build_knowledgebase.reduce_function",
"backend.build_knowledgebase.BlobstoreXMLZipLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_keys": [blobkey],
},
reducer_params={
"mime_type": "text/plain",
},
shards=12)
yield StoreOutput(filekey, output)
编辑:运行无休止的工作时,我在开发服务器中遇到一些奇怪的错误:
[App Instance] [0] [dev_appserver_multiprocess.py:821] INFO Exception in HandleRequestThread
Traceback (most recent call last):
File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 819, in run
HandleRequestDirectly(request, client_address)
File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 957, in HandleRequestDirectly
HttpServer(), request, client_address)
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 310, in process_request
self.finish_request(request, client_address)
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 323, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver.py", line 2579, in __init__
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 641, in __init__
self.finish()
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 694, in finish
self.wfile.flush()
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/socket.py", line 303, in flush
self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 32] Broken pipe