2

App Engine Mapreduce API 是否在最终的 reduce 作业中根据自己的逻辑决定计算分片大小?

我正在使用 App Engine mapreduce API 并提供了shard_size kwarg 来设置我的 mapreduce 分片大小。

分片大小在我的 mapreduce 作业中特别重要,因为我不想将太多结果批处理到任何一个给定的 reduce 函数最后一步的执行中。换句话说,我对分片大小进行了硬编码,以根据系统上的外部约束平均划分用户。

map 作业似乎可以很好地分片,但 reducer 只使用了我指定的分片的一小部分。

这是我正在处理的代码类型的粗略概述:

SHARD_SIZE = 42

def map_fun(entity):
  shard_key = random.randint(1, SHARD_SIZE)
  yield (
    shard_key,
    db.model_to_protobuf(entity).SerializeToString().encode('base64')
  )

def reduce_fun(key, entities):
  batch = []
  for entity in entities:
    #check for stuff
    batch.append(entity)
  expensive_side_effect(batch)


class MyGreatPipeline(base_handler.PipelineBase):
  def run(self, *args, **kw):
    yield mapreduce_pipeline.MapreducePipeline(
      'label'
      'path.to.map_fun',
      'path.to.reduce_fun',
      'mapreduce.input_readers.DatastoreInputReader',
      'mapreduce.output_writers.BlobstoreOutputWriter',
      mapper_params={
        'entity_kind': 'path.to.entity',
        'queue_name': 'coolQueue'
      },
      reducer_params={},
      shard_size = SHARD_SIZE
    )

map_fun具体为每个实体分配一个根据分片大小随机确定的分片。我很困惑为什么我的 reducer 的分片比SHARD_SIZE考虑到有很多实体的分片更少,而且重复挑选相同的整数的可能性极小。

4

1 回答 1

0

我很困惑你在这里做什么。使用 map 阶段将内容分组到一个小的分片键上,然后在减少时间处理这些键看起来很奇怪。你最终会为每个键做太多的工作,即使你雇佣的 reduce 工作人员和 mapper 工作人员一样多。

正在处理的“批处理”是随机的,所以我认为这expensive_side_effect()不依赖于批处理的内容。为什么不在 map 时做这个工作,发出 reduce 可以传递给输出 writer 的东西?

于 2012-06-22T04:35:25.133 回答