2

我有一系列 Map/Reduce 工作:

Job1 将带有时间戳的数据作为键,将一些数据作为值并对其进行转换。

对于 Job2,我需要将 Job1 中所有映射器中出现的最大时间戳作为参数传递。(我知道如何将参数传递给 Mappers/Reducers)

我可以跟踪 Job1 的每个映射器中的最大时间戳,但是如何获得所有映射器的最大值并将其作为参数传递给 Job2?

我想避免仅仅为了确定最大时间戳而运行 Map/Reduce 作业,因为我的数据集的大小是 TB+ 规模。

有没有办法使用 Hadoop 或 Zookeeper 来实现这一点?

4

2 回答 2

1

没有办法 2 个地图可以互相交谈。所以只有地图的工作(job1)不能让你获得全局最大值。时间戳。但是,我可以想到以下两种方法。

我假设您的 job1 目前是仅地图作业,并且您正在编写地图本身的输出。

A. 更改您的映射器以使用 MultipleOutputs 而不是 Context 或 OutputCollector 写入主输出。使用 context.write() 将附加的 (key,value) 对作为 (constant,timestamp) 发出。这样,您只打乱 (constant,timestamp) ) 对减速器。添加一个计算最大值的减速器。在它收到的值中。运行作业,reducer 的数量设置为 1。从 mapper 写入的输出将为您提供原始输出,而从 reducer 写入的输出将为您提供全局最大值。时间戳。

B. 在 job1 中,写下最大值。每个映射器中的时间戳作为输出。您可以在 cleanup() 中执行此操作。使用 MultipleOutputs 写入原始输出之外的文件夹。作业 1 完成后,假设作业 1 中有“x”映射器,则输出文件夹中有“x”零件文件。您可以在此文件夹上执行 getmerge 以将所有零件文件放入单个本地文件中。该文件将具有“x”行每行都包含一个时间戳。您可以使用独立的 java 程序阅读此内容,找到全局最大值。时间戳并将其保存在某个本地文件中。使用 distrib 缓存将此文件共享给 job2 或传递全局最大值。作为参数。

于 2013-05-21T09:15:49.200 回答
1

我建议执行以下操作,创建一个目录,您可以在其中将每个 Mapper 的最大值放在一个文件中,即映射器名称+id。这个想法是有第二个输出目录,为了避免并发问题,只需确保每个映射器都写入一个唯一的文件。将最大值保留为变量并将其写入每个映射器cleanup方法的文件中。

作业完成后,迭代二级输出目录以找到最大值是微不足道的。

于 2013-05-20T15:16:07.167 回答