我想通过以下步骤执行 mapreduce 作业:
1) 从通过 html 表单输入的文件映射
2)减少并从减少中创建新文件
这是我目前的管道设置方式。主要问题是如何将文件传递给 RecordsReader。
RecordsReader 有一个参数 files,要传递“包含要读取的文件的字符串或包含要读取的多个文件字符串的列表”。-谷歌
class EPNPipeline(base_handler.PipelineBase):
def run(self, filekey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"EPN",
"map_process_epn",
"reduce_process_epn",
"mapreduce.input_readers.RecordsReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"files": filekey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=24)
yield StoreOutput("EPN", filekey, output)
我尝试将文件作为文件对象和字符串表示形式传递,但两者都不起作用,并且没有像我想要的那样使用不同的输入/输出读取器/写入器的文档。
任何帮助或指示将不胜感激。
谢谢