App Engine Mapreduce API 是否在最终的 reduce 作业中根据自己的逻辑决定计算分片大小?
我正在使用 App Engine mapreduce API 并提供了shard_size
kwarg 来设置我的 mapreduce 分片大小。
分片大小在我的 mapreduce 作业中特别重要,因为我不想将太多结果批处理到任何一个给定的 reduce 函数最后一步的执行中。换句话说,我对分片大小进行了硬编码,以根据系统上的外部约束平均划分用户。
map 作业似乎可以很好地分片,但 reducer 只使用了我指定的分片的一小部分。
这是我正在处理的代码类型的粗略概述:
SHARD_SIZE = 42
def map_fun(entity):
shard_key = random.randint(1, SHARD_SIZE)
yield (
shard_key,
db.model_to_protobuf(entity).SerializeToString().encode('base64')
)
def reduce_fun(key, entities):
batch = []
for entity in entities:
#check for stuff
batch.append(entity)
expensive_side_effect(batch)
class MyGreatPipeline(base_handler.PipelineBase):
def run(self, *args, **kw):
yield mapreduce_pipeline.MapreducePipeline(
'label'
'path.to.map_fun',
'path.to.reduce_fun',
'mapreduce.input_readers.DatastoreInputReader',
'mapreduce.output_writers.BlobstoreOutputWriter',
mapper_params={
'entity_kind': 'path.to.entity',
'queue_name': 'coolQueue'
},
reducer_params={},
shard_size = SHARD_SIZE
)
map_fun
具体为每个实体分配一个根据分片大小随机确定的分片。我很困惑为什么我的 reducer 的分片比SHARD_SIZE
考虑到有很多实体的分片更少,而且重复挑选相同的整数的可能性极小。