7

我编写了一个简单的 MapReduce 流程,以从 Google Cloud Storage 上的文件中的 CSV 中读取行,然后创建一个实体。但是,我似乎无法让它在多个 shard 上运行

该代码使用了 mapreduce.control.start_map,看起来像这样。

class LoadEntitiesPipeline(webapp2.RequestHandler):
        id = control.start_map(map_name,
                          handler_spec="backend.line_processor",
                          reader_spec="mapreduce.input_readers.FileInputReader",
                          queue_name=get_queue_name("q-1"),
                          shard_count=shard_count,
                          mapper_parameters={
                              'shard_count': shard_count,
                              'batch_size': 50,
                              'processing_rate': 1000000,
                              'files': [gsfile],
                              'format': 'lines'})

我在这两个地方都有 shard_count ,因为我不确定哪些方法真正需要它。将 shard_count 设置为 8 到 32 之间的任何值都不会改变任何东西,因为状态页面总是显示 1/1 分片正在运行。为了分开事情,我让所有东西都在一个有大量实例的后端队列上运行。我已经尝试根据这个 wiki调整队列参数。最后,它似乎只是串行运行。

有任何想法吗?谢谢!

更新(仍然没有成功):

在尝试隔离事物时,我尝试使用直接调用管道进行调用,如下所示:

class ImportHandler(webapp2.RequestHandler):

    def get(self, gsfile):
        pipeline = LoadEntitiesPipeline2(gsfile)
        pipeline.start(queue_name=get_queue_name("q-1"))

        self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)


class LoadEntitiesPipeline2(base_handler.PipelineBase):

    def run(self, gsfile):
        yield mapreduce_pipeline.MapperPipeline(
           'loadentities2_' + gsfile,
           'backend.line_processor',
           'mapreduce.input_readers.FileInputReader',
           params={'files': [gsfile], 'format': 'lines'},
           shards=32
        )

有了这个新代码,它仍然只在一个分片上运行。 我开始怀疑 mapreduce.input_readers.FileInputReader 是否能够按行并行化输入。

4

3 回答 3

5

It looks like FileInputReader can only shard via files. The format params only change the way mapper function got call. If you pass more than one files to the mapper, it will start to run on more than one shard. Otherwise it will only use one shard to process the data.

EDIT #1:

After dig deeper in the mapreduce library. MapReduce will decide whether or not to split file into pieces based on the can_split method return for each file type it defined. Currently, the only format which implement split method is ZipFormat. So, if your file format is not zip, it won't split the file to run on more than one shard.

@classmethod
  def can_split(cls):
    """Indicates whether this format support splitting within a file boundary.

    Returns:
      True if a FileFormat allows its inputs to be splitted into
    different shards.
    """

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/file_formats.py

But it looks like it is possible to write your own file format split method. You can try to hack and add split method on _TextFormat first and see if more than one shard running.

@classmethod
def split(cls, desired_size, start_index, opened_file, cache):
    pass

EDIT #2:

An easy workaround would be left the FileInputReader run serially but move the time-cosuming task to parallel reduce stage.

def line_processor(line):
    # serial
    yield (random.randrange(1000), line)

def reducer(key, values):
    # parallel
    entities = []
    for v in values:
        entities.append(CREATE_ENTITY_FROM_VALUE(v))
    db.put(entities)

EDIT #3:

If try to modify the FileFormat, here is an example (haven't been test yet)

from file_formats import _TextFormat, FORMATS


class _LinesSplitFormat(_TextFormat):
  """Read file line by line."""

  NAME = 'split_lines'

  def get_next(self):
    """Inherited."""
    index = self.get_index()
    cache = self.get_cache()
    offset = sum(cache['infolist'][:index])

    self.get_current_file.seek(offset)
    result = self.get_current_file().readline()
    if not result:
      raise EOFError()
    if 'encoding' in self._kwargs:
      result = result.encode(self._kwargs['encoding'])
    return result

  @classmethod
  def can_split(cls):
    """Inherited."""
    return True

  @classmethod
  def split(cls, desired_size, start_index, opened_file, cache):
    """Inherited."""
    if 'infolist' in cache:
      infolist = cache['infolist']
    else:
      infolist = []
      for i in opened_file:
        infolist.append(len(i))
        cache['infolist'] = infolist

    index = start_index
    while desired_size > 0 and index < len(infolist):
      desired_size -= infolist[index]
      index += 1
    return desired_size, index


FORMATS['split_lines'] = _LinesSplitFormat

Then the new file format can be called via change the mapper_parameters from lines to split_line.

class LoadEntitiesPipeline(webapp2.RequestHandler):
    id = control.start_map(map_name,
                      handler_spec="backend.line_processor",
                      reader_spec="mapreduce.input_readers.FileInputReader",
                      queue_name=get_queue_name("q-1"),
                      shard_count=shard_count,
                      mapper_parameters={
                          'shard_count': shard_count,
                          'batch_size': 50,
                          'processing_rate': 1000000,
                          'files': [gsfile],
                          'format': 'split_lines'})
于 2013-10-11T15:27:17.143 回答
0

在我看来,FileInputReader 应该能够根据以下内容进行分片: https ://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/input_readers.py

看起来 'format': 'lines' 应该使用分割:self.get_current_file().readline()

当它连续工作时,它似乎正确地解释了这些行吗?也许换行符是错误的编码或其他东西。

于 2013-10-08T22:11:17.727 回答
0

根据经验,FileInputReader 每个文件最多会执行一个分片。解决方案:拆分大文件。在将文件上传到云存储之前,我使用https://github.com/johnwlockwood/karl_data中的 split_file 对文件进行分片。如果大文件已经在那里,您可以使用 Compute Engine 实例将它们拉下来并进行分片,因为传输速度会最快。仅供参考:karld 在奶酪店,所以你可以pip install karld

于 2013-10-11T06:06:48.103 回答