0

我正在使用 MapReduce(实际上只是映射)分四个阶段执行数据处理任务。每个阶段都是一个 MapReduce 作业。我需要它们按顺序运行,也就是说,在第 1 阶段完成之前不要开始第 2 阶段,等等。有没有人有这样做的经验可以分享?

理想情况下,我们会在一夜之间完成这个 4-job 序列,因此使其能够 cron 也是一件好事。

谢谢你

4

3 回答 3

1

正如 Daniel 提到的,appengine-pipeline 库旨在解决这个问题。在这篇博文中,我在“实现你自己的管道作业”部分下讨论了将 mapreduce 作业链接在一起。

为方便起见,我将在此处粘贴相关部分:

现在我们知道如何启动预定义的 MapreducePipeline,让我们来看看实现和运行我们自己的自定义管道作业。管道库提供了一个低级库,用于在 appengine 中启动任意分布式计算作业,但是,现在,我们将专门讨论如何使用它来帮助我们将 mapreduce 作业链接在一起。让我们扩展我们之前的示例以输出字符和 ID 的反向索引。

首先,我们定义父管道作业。

class ChainMapReducePipeline(mapreduce.base_handler.PipelineBase):
def run(self):
    deduped_blob_key = (
    yield mapreduce.mapreduce_pipeline.MapreducePipeline(
        "test_combiner",
        "main.map",
        "main.reduce",
        "mapreduce.input_readers.RandomStringInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        combiner_spec="main.combine",
        mapper_params={
            "string_length": 1,
            "count": 500,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=16))

    char_to_id_index_blob_key = (
    yield mapreduce.mapreduce_pipeline.MapreducePipeline(
        "test_chain",
        "main.map2",
        "main.reduce2",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        # Pass output from first job as input to second job
        mapper_params=(yield BlobKeys(deduped_blob_key)),
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=4))

这将启动与第一个示例相同的作业,从该作业中获取输出,并将其提供给第二个作业,这将反转每个条目。请注意,第一个管道 yield 的结果被传递到第二个作业的 mapper_params 中。管道库使用魔法来检测第二个管道是否依赖于第一个完成,并且在 deduped_blob_key 解决之前不会启动它。

接下来,我必须创建 BlobKeys 辅助类。起初,我认为这没有必要,因为我可以这样做:

mapper_params={"blob_keys": deduped_blob_key},

但是,这不起作用有两个原因。首先是“生成器管道不能直接访问它产生的子管道的输出”。上面的代码将要求生成器管道使用第一个作业的输出创建一个临时 dict 对象,这是不允许的。第二个是 BlobstoreOutputWriter 返回的字符串格式为“/blobstore/”,但 BlobstoreLineInputReader 只需要“”。为了解决这些问题,我做了一个小帮手 BlobKeys 类。您会发现自己为许多工作执行此操作,并且管道库甚至包括一组通用包装器,但它们在 MapreducePipeline 框架中不起作用,我将在本节的底部讨论。

class BlobKeys(third_party.mapreduce.base_handler.PipelineBase):
  """Returns a dictionary with the supplied keyword arguments."""

  def run(self, keys):
    # Remove the key from a string in this format:
    # /blobstore/<key>
    return {
        "blob_keys": [k.split("/")[-1] for k in keys]
    }

这是 map2 和 reduce2 函数的代码:

def map2(data):
    # BlobstoreLineInputReader.next() returns a tuple
    start_position, line = data
    # Split input based on previous reduce() output format
    elements = line.split(" - ")
    random_id = elements[0]
    char = elements[1]
    # Swap 'em
    yield (char, random_id)

def reduce2(key, values):
    # Create the reverse index entry
    yield "%s - %s\n" % (key, ",".join(values))
于 2013-11-28T18:40:33.343 回答
0

您需要appengine-pipeline项目,它就是为此而生的。

于 2013-10-21T20:01:49.510 回答
0

我不熟悉 google-app-engine,但是您不能将所有作业配置放在一个主程序中,然后按顺序运行它们吗?类似于以下内容?我认为这适用于普通的 map-reduce 程序,所以如果 google-app-engine 代码没有太大不同,它应该可以正常工作。

Configuration conf1 = getConf();
Configuration conf2 = getConf();
Configuration conf3 = getConf();
Configuration conf4 = getConf();

//whatever configuration you do for the jobs

Job job1 = new Job(conf1,"name1");
Job job2 = new Job(conf2,"name2");
Job job3 = new Job(conf3,"name3");
Job job4 = new Job(conf4,"name4");

//setup for the jobs here

job1.waitForCompletion(true);
job2.waitForCompletion(true);
job3.waitForCompletion(true);
job4.waitForCompletion(true);
于 2013-10-21T19:33:53.340 回答