5

我有大约 200 万条记录,每个记录大约有 4 个字符串字段,需要检查是否有重复项。更具体地说,我有姓名、电话、地址和父亲姓名作为字段,我必须使用所有这些字段和其余数据检查重复数据删除。生成的唯一记录需要记录到 db.xml 中。

我已经能够实现mapreduce,迭代所有记录。任务速率设置为 100/s,存储桶大小设置为 100。启用计费。

目前,一切正常,但性能非常缓慢。我已经能够在 6 小时内完成 10,000 条记录的测试数据集中仅 1000 条记录的重复数据删除处理。

java中的当前设计是:

  1. 在每次地图迭代中,我都会将当前记录与之前的记录进行比较
  2. 上一条记录是 db 中的一条记录,它就像一个全局变量,我在每次映射迭代中用另一个以前的记录覆盖它
  3. 使用算法进行比较,并将结果作为新实体写入 db
  4. 在一个 Mapreduce 作业结束时,我以编程方式创建另一个作业
  5. 上一个记录变量有助于作业将下一个候选记录与其余数据进行比较

我准备增加任何数量的 GAE 资源以在最短的时间内实现这一目标。

我的问题是:

  1. 重复数据删除(检查重复项)的准确性是否会因并行作业/任务而受到影响?
  2. 如何改进这种设计?
  3. 这是否会扩展到 2000 万条记录
  4. 在映射迭代期间读取/写入变量(不仅仅是计数器)的最快方法是什么,可以在一个 mapreduce 作业中使用。

最欢迎自由职业者在这方面提供帮助。

谢谢你的帮助。

4

5 回答 5

4

您应该利用 Reducer 对每个字段执行相当于 sort -u 的操作。您需要在每个字段中完成一项 M/R 工作。您将在映射器中创建要比较键的字段,然后在化简器中将所有具有相同名称的记录分组在一起,您可以标记它们。第二遍将用于电话等。根据您的集群大小,每个遍应该非常快。

编辑:@Olaf 指出 OP 可能想要完全独特的记录。使用多部分密钥,这可能是获取唯一集的单行 hadoop 流命令。我会尽快补充的。

Edit2:承诺的流式传输命令,将对整个文件执行 sort -u。这假设您有一个文件,其中包含每个字段(姓名、父亲姓名、电话号码和地址)的记录,每行一个制表符分隔在 dir hdfs://example/dedup/input/ 中的一个或多个文件中。实际的 hdfs 路径可以是任何东西,或者您可以使用单个文件。输出将是 hdfs://example/dedup/output/ 中的多个 part-* 文件。您可能还需要更改命令,因为您的 hadoop-streaming.jar 可能位于稍微不同的位置。如果您有超过 4 个字段,请更改 stream.num.map.output.key.fields 的值。

   $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input hdfs://example/dedup/input/ -output hdfs://example/dedup/output/ \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /usr/bin/uniq \
-D stream.num.map.output.key.fields=4

要将唯一结果检索到本地文件系统文件中的文件,请运行以下命令:

    $HADOOP_HOME/bin/hadoop fs -cat \
 'hdfs://example/dedup/output/part-*' > results.txt

需要注意的是,由于每一列都是一个键,流式传输将添加一个空值,因此每一行最后都会有一个额外的选项卡。那很容易被剥离。

如果您想做的不仅仅是获得 uniq 输出,您可以放置​​自己的 java 类或命令行程序,而不是使用 /usr/bin/uniq。例如,该类可以通过在输入中添加第五列即记录数据库 ID 来更新您发现重复的所有记录。默认情况下,Hadoop 按整个键对结果进行分区,因此每组重复记录将通过一个 reducer 一起流式传输,这一切都将并行发生。请查看流媒体文档以获取更多信息。

于 2011-07-21T03:16:19.697 回答
3

我看到了解决这个问题的两种方法:

  1. (如果您只需要执行一次)AppEngine 会为您的实体中的每个属性创建一个属性索引(除非您要求它不要这样做)。创建一个后端,使用游标批量运行查询“SELECT * FROM ORDER BY”,确定重复的属性并修复/删除它们。您可能可以并行化它,但在分片边界上很棘手,您可能必须自己编写所有代码。

  2. 您可以使用映射器框架来减慢速度,但可以并行运行。这种方法还允许您在插入时有效地删除重复数据。引入一个新实体来保存唯一的属性值。说“唯一电话号码”。该实体应持有一个电话号码作为键,并引用该电话号码的实体。现在运行地图并查找 UniquePhoneNumber。如果找到并且其引用有效,请删除重复项。如果没有正确引用创建一个新的。这样,如果需要,甚至可以重新指向另一个引用。确保您阅读 UniquePhoneNumber 并在单个事务中创建一个新的/更新一个新的。否则不会检测到重复项。

于 2011-07-21T18:28:15.670 回答
1

为每条记录生成一个哈希码。Set循环遍历您的记录并根据哈希码将每个记录插入到 a中。现在Set是 O(N) 中的重复数据删除列表。

于 2011-07-21T03:21:29.427 回答
1

你绝对不应该使用你当前的方法——一次只有一个进程可以更新一个实体,所以你的整个 mapreduce 在那个实体上是瓶颈。此外,mapreduce 目前不允许您指定结果集的顺序,因此您无法保证会找到所有(甚至大部分)重复项。

目前,您最好的选择可能是构建自己的解决方案。使用游标,对按要删除重复的字段排序的类型执行查询,然后扫描它,检查重复并在遇到重复时将其删除(分批,以减少 RPC)。当您需要链接另一个任务时(由于 10 分钟的任务限制),请使用光标确保新任务从您离开的地方继续。

如果你想并行化这个,你可以通过让每个分片开始跳过记录,直到它发现你要删除重复数据的值发生变化,然后从那里开始。在分片的末尾,等到您到达组的末尾后再停止。这样,您可以确保不会错过删除位于分片边界边缘的重复项。

于 2011-07-21T05:06:47.010 回答
0

这是一个基于哈希自连接和 Map Reduce 的解决方案。它还可以使用编辑距离算法进行模糊重复匹配。您可以从记录中选择要用于重复检测的字段。reducer 会输出一个重复的分数。

https://pkghosh.wordpress.com/2013/09/09/identifying-duplicate-records-with-fuzzy-matching/

于 2015-05-09T01:30:48.287 回答