我正在使用 MapReduce(实际上只是映射)分四个阶段执行数据处理任务。每个阶段都是一个 MapReduce 作业。我需要它们按顺序运行,也就是说,在第 1 阶段完成之前不要开始第 2 阶段,等等。有没有人有这样做的经验可以分享?
理想情况下,我们会在一夜之间完成这个 4-job 序列,因此使其能够 cron 也是一件好事。
谢谢你
我正在使用 MapReduce(实际上只是映射)分四个阶段执行数据处理任务。每个阶段都是一个 MapReduce 作业。我需要它们按顺序运行,也就是说,在第 1 阶段完成之前不要开始第 2 阶段,等等。有没有人有这样做的经验可以分享?
理想情况下,我们会在一夜之间完成这个 4-job 序列,因此使其能够 cron 也是一件好事。
谢谢你
正如 Daniel 提到的,appengine-pipeline 库旨在解决这个问题。在这篇博文中,我在“实现你自己的管道作业”部分下讨论了将 mapreduce 作业链接在一起。
为方便起见,我将在此处粘贴相关部分:
现在我们知道如何启动预定义的 MapreducePipeline,让我们来看看实现和运行我们自己的自定义管道作业。管道库提供了一个低级库,用于在 appengine 中启动任意分布式计算作业,但是,现在,我们将专门讨论如何使用它来帮助我们将 mapreduce 作业链接在一起。让我们扩展我们之前的示例以输出字符和 ID 的反向索引。
首先,我们定义父管道作业。
class ChainMapReducePipeline(mapreduce.base_handler.PipelineBase):
def run(self):
deduped_blob_key = (
yield mapreduce.mapreduce_pipeline.MapreducePipeline(
"test_combiner",
"main.map",
"main.reduce",
"mapreduce.input_readers.RandomStringInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
combiner_spec="main.combine",
mapper_params={
"string_length": 1,
"count": 500,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16))
char_to_id_index_blob_key = (
yield mapreduce.mapreduce_pipeline.MapreducePipeline(
"test_chain",
"main.map2",
"main.reduce2",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
# Pass output from first job as input to second job
mapper_params=(yield BlobKeys(deduped_blob_key)),
reducer_params={
"mime_type": "text/plain",
},
shards=4))
这将启动与第一个示例相同的作业,从该作业中获取输出,并将其提供给第二个作业,这将反转每个条目。请注意,第一个管道 yield 的结果被传递到第二个作业的 mapper_params 中。管道库使用魔法来检测第二个管道是否依赖于第一个完成,并且在 deduped_blob_key 解决之前不会启动它。
接下来,我必须创建 BlobKeys 辅助类。起初,我认为这没有必要,因为我可以这样做:
mapper_params={"blob_keys": deduped_blob_key},
但是,这不起作用有两个原因。首先是“生成器管道不能直接访问它产生的子管道的输出”。上面的代码将要求生成器管道使用第一个作业的输出创建一个临时 dict 对象,这是不允许的。第二个是 BlobstoreOutputWriter 返回的字符串格式为“/blobstore/”,但 BlobstoreLineInputReader 只需要“”。为了解决这些问题,我做了一个小帮手 BlobKeys 类。您会发现自己为许多工作执行此操作,并且管道库甚至包括一组通用包装器,但它们在 MapreducePipeline 框架中不起作用,我将在本节的底部讨论。
class BlobKeys(third_party.mapreduce.base_handler.PipelineBase):
"""Returns a dictionary with the supplied keyword arguments."""
def run(self, keys):
# Remove the key from a string in this format:
# /blobstore/<key>
return {
"blob_keys": [k.split("/")[-1] for k in keys]
}
这是 map2 和 reduce2 函数的代码:
def map2(data):
# BlobstoreLineInputReader.next() returns a tuple
start_position, line = data
# Split input based on previous reduce() output format
elements = line.split(" - ")
random_id = elements[0]
char = elements[1]
# Swap 'em
yield (char, random_id)
def reduce2(key, values):
# Create the reverse index entry
yield "%s - %s\n" % (key, ",".join(values))
您需要appengine-pipeline项目,它就是为此而生的。
我不熟悉 google-app-engine,但是您不能将所有作业配置放在一个主程序中,然后按顺序运行它们吗?类似于以下内容?我认为这适用于普通的 map-reduce 程序,所以如果 google-app-engine 代码没有太大不同,它应该可以正常工作。
Configuration conf1 = getConf();
Configuration conf2 = getConf();
Configuration conf3 = getConf();
Configuration conf4 = getConf();
//whatever configuration you do for the jobs
Job job1 = new Job(conf1,"name1");
Job job2 = new Job(conf2,"name2");
Job job3 = new Job(conf3,"name3");
Job job4 = new Job(conf4,"name4");
//setup for the jobs here
job1.waitForCompletion(true);
job2.waitForCompletion(true);
job3.waitForCompletion(true);
job4.waitForCompletion(true);