4

我们有一个系统,它接收指定目录上的档案,并定期启动一个 mapreduce 作业,该作业打开档案并处理其中的文件。为了避免下次重新处理相同的档案,我们在 RecordReader 上使用 close() 方法,以便在读取最后一个条目后将其删除。

这种方法的问题(我们认为)是,如果一个特定的映射失败,下一个对其进行另一次尝试的映射器会发现原始文件已被记录读取器从第一个文件中删除,并且它会被炸毁。我们认为要走的路是等到所有映射和归约完成,然后删除输入档案。

这是最好的方法吗?

如果是这样,我们如何从主程序中获取系统找到的所有输入文件的列表?(我们不能只清理整个输入目录,可能存在新文件)

IE:

   . . .

   job.waitForCompletion(true);

   (we're done, delete input files, how?)

   return 0;
}
4

4 回答 4

2

情侣评论。

  1. 我认为这种设计很容易让人心痛。当您发现有人向您的 MR 集群部署了一个混乱的算法并且您必须回填一个月的档案时会发生什么?他们现在走了。当处理时间比预期的要长并且需要在旧工作完全完成之前开始新工作时会发生什么?存在太多文件,有些文件被重新处理。当档案仍在飞行中时,工作何时开始呢?等等。

  2. 摆脱这个陷阱的一种方法是让档案根据时间转到一个轮换位置,然后自己清除记录,或者(在 S3 之类的情况下)建立一个保留策略,允许特定的操作窗口。此外,无论后端 map reduce 处理正在做什么都可能是幂等的:处理相同的记录两次应该与处理一次没有什么不同。有些事情告诉我,如果您要减少数据集,那么该属性将难以保证。

  3. 至少您可以重命名您处理的文件,而不是立即删除它们,并使用 glob 表达式来定义不包括重命名文件的输入。正如我上面提到的,仍然存在竞争条件。

  4. 您可以使用 Amazon SQS 之类的队列来记录存档的交付,并且您的 InputFormat 可以提取这些条目,而不是在确定输入拆分时列出存档文件夹。但是,如果没有额外的基础设施,再处理或回填就会出现问题。

  5. 话虽如此,拆分列表是由 InputFormat 生成的。围绕它写一个装饰器,你可以在工作完成后将拆分列表存放在任何你想要的地方供主人使用。

于 2013-09-26T07:06:45.083 回答
1

最简单的方法可能是执行多输入作业,在运行作业之前读取文件的目录并将这些文件而不是目录传递给作业(然后在作业完成后删除列表中的文件)。

于 2013-09-26T06:25:42.197 回答
0

根据您所解释的情况,我可以建议以下解决方案:- 1.数据监控过程即监控档案登陆的目录应由单独的过程完成。该单独的进程可以使用一些元数据表(如 mysql)来放置基于监视目录的状态条目。元数据条目还可以检查重复性。2.现在基于元数据条目,一个单独的进程可以处理map reduce作业触发部分。可以在元数据中检查某些状态以触发作业。

于 2013-09-26T17:17:28.263 回答
0

我认为您应该使用Apache Oozie来管理您的工作流程。来自 Oozie 的网站(粗体字是我的):

Oozie 是一个用于管理 Apache Hadoop 作业的工作流调度系统。

...

Oozie Coordinator 作业是由时间(频率)和数据可用性触发的周期性 Oozie Workflow 作业。

于 2013-10-10T19:31:12.400 回答