我编写了一个简单的 MapReduce 流程,以从 Google Cloud Storage 上的文件中的 CSV 中读取行,然后创建一个实体。但是,我似乎无法让它在多个 shard 上运行。
该代码使用了 mapreduce.control.start_map,看起来像这样。
class LoadEntitiesPipeline(webapp2.RequestHandler):
id = control.start_map(map_name,
handler_spec="backend.line_processor",
reader_spec="mapreduce.input_readers.FileInputReader",
queue_name=get_queue_name("q-1"),
shard_count=shard_count,
mapper_parameters={
'shard_count': shard_count,
'batch_size': 50,
'processing_rate': 1000000,
'files': [gsfile],
'format': 'lines'})
我在这两个地方都有 shard_count ,因为我不确定哪些方法真正需要它。将 shard_count 设置为 8 到 32 之间的任何值都不会改变任何东西,因为状态页面总是显示 1/1 分片正在运行。为了分开事情,我让所有东西都在一个有大量实例的后端队列上运行。我已经尝试根据这个 wiki调整队列参数。最后,它似乎只是串行运行。
有任何想法吗?谢谢!
更新(仍然没有成功):
在尝试隔离事物时,我尝试使用直接调用管道进行调用,如下所示:
class ImportHandler(webapp2.RequestHandler):
def get(self, gsfile):
pipeline = LoadEntitiesPipeline2(gsfile)
pipeline.start(queue_name=get_queue_name("q-1"))
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
class LoadEntitiesPipeline2(base_handler.PipelineBase):
def run(self, gsfile):
yield mapreduce_pipeline.MapperPipeline(
'loadentities2_' + gsfile,
'backend.line_processor',
'mapreduce.input_readers.FileInputReader',
params={'files': [gsfile], 'format': 'lines'},
shards=32
)
有了这个新代码,它仍然只在一个分片上运行。 我开始怀疑 mapreduce.input_readers.FileInputReader 是否能够按行并行化输入。