2

我有一个数据库表的两个版本(旧/新),大约有 100,000,000 条记录。它们在文件中:

trx-old
trx-new

结构是:

id date amount memo
1  5/1     100 slacks
2  5/1      50 wine

id 是简单的主键,其他字段是非键。我想生成三个文件:

trx-removed (ids of records present in trx-old but not in trx-new)
trx-added   (records from trx-new whose ids are not present in trx-old)
trx-changed (records from trx-new whose non-key values have changed since trx-old)

我需要每天在一个短批处理窗口中执行此操作。实际上,我需要为多个表和多个模式(为每个模式生成三个文件)执行此操作,因此实际的应用程序涉及更多。但我认为这个例子抓住了问题的症结所在。

这感觉像是一个明显的 mapreduce 应用程序。从未编写过 mapreduce 应用程序,我的问题是:

  1. 是否有一些 EMR 应用程序已经这样做了?
  2. 是否存在明显的 Pig 或 Cascading 解决方案?
  3. 还有其他一些非常接近于此的开源示例吗?

PS 我看到了表格问题之间的差异,但那里的解决方案看起来没有可扩展性。

PPS 这是一个演示算法的小 Ruby 玩具:Ruby dbdiff

4

4 回答 4

2

我认为编写自己的工作是最简单的,主要是因为当典型的 reducer 只写入一个文件时,您将希望使用 MultipleOutputs 从单个 reduce 步骤写入三个单独的文件。您需要使用 MultipleInputs 为每个表指定一个映射器。

于 2013-05-04T20:05:51.187 回答
1

正如@ChrisGerken 建议的那样,您必须使用MultipleOutputsandMultipleInputs来生成多个输出文件并将自定义映射器与每个输入文件类型(旧/新)相关联。

映射器将输出:

  • 键:主键(id)
  • 值:来自输入文件的记录,带有附加标志(新/旧取决于输入)

reducer 将遍历R每个键和输出的所有记录:

  • 删除文件:如果只存在带有标记 old 的记录。
  • 添加文件:如果只存在带有新标志的记录。
  • 更改文件:如果记录R不同。

由于该算法会随着 reducer 的数量而扩展,因此您很可能需要第二个作业,它将结果合并到一个文件中以进行最终输出。

于 2013-05-05T20:00:52.010 回答
1

我想到的是:

考虑你的表是这样的:

Table_old
1    other_columns1
2    other_columns2
3    other_columns3

Table_new 
2    other_columns2
3    other_columns3
4    other_columns4

追加 table_old 的元素“a”和 table_new 的元素“b”。

当您合并两个文件时,如果一个元素存在于第一个文件中而不是第二个文件中,则会将其删除

table_merged
1a    other_columns1
2a    other_columns2
2b    other_columns2
3a    other_columns3
3b    other_columns3
4a    other_columns4

从该文件中,您可以轻松地进行操作。

另外,假设您的 id 是 n 位数字,并且您有 10 个集群+1 个主节点。您的密钥将是 id 的第一个数字,因此,您将数据均匀地划分为集群。您将进行分组+分区,以便对您的数据进行排序。

例子,

table_old
1...0 data
1...1 data
2...2 data

table_new
1...0 data
2...2 data
3...2 data

您的密钥是第一个数字,您根据该数字进行分组,您的分区是根据 ID 的其余部分。然后您的数据将作为

worker1
1...0b data
1...0a data
1...1a data

worker2 
2...2a data
2...2b data and so on.

请注意,a,b 不必排序。

编辑 合并将是这样的:

FileInputFormat.addInputPath(job, new Path("trx-old"));
FileInputFormat.addInputPath(job, new Path("trx-new"));

MR 将得到两个输入,两个文件将被合并,

对于附加部分,您应该在 Main MR 之前再创建两个作业,这将只有Map. 第一个Map意志append "a"对第一个列表中的每个元素,第二个意志append "b"对第二个列表的元素。第三个工作(我们现在使用的那个/主地图)将只有减少工作来收集它们。所以你会有Map-Map-Reduce.

追加可以这样完成

//you have key:Text
new Text(String.valueOf(key.toString()+"a"))

但我认为可能有不同的附加方式,其中一些可能在(text hadoop)中更有效

希望对您有所帮助,

于 2013-05-04T20:35:18.823 回答
1

这似乎是级联解决的完美问题。您已经提到您从未编写过 MR 应用程序,如果您的目的是快速入门(假设您熟悉 Java),那么 Cascading 是您的方式,恕我直言。稍后我将详细介绍这一点。

可以使用 Pig 或 Hive,但如果您想对这些列执行额外的分析或更改架构,它们就不够灵活,因为您可以通过读取列标题或映射文件在 Cascading 中动态构建架构您创建来表示架构。

Cascading你会:

  1. 设置您的传入Taps:点击 trxOld 并点击 trxNew(这些指向您的源文件)
  2. 将您的水龙头连接到Pipes:Pipe oldPipe 和 Pipe newPipe
  3. 设置您的传出Taps:点击 trxRemoved、点击 trxAdded 和点击 trxChanged
  4. 建立你的管道分析(这是有趣(伤害)发生的地方)

trx-removed : 添加了 trx

Pipe trxOld = new Pipe ("old-stuff");
Pipe trxNew = new Pipe ("new-stuff");
//smallest size Pipe on the right in CoGroup
Pipe oldNnew = new CoGroup("old-N-new", trxOld, new Fields("id1"), 
                                       trxNew, new Fields("id2"), 
                                       new OuterJoin() ); 

外部连接为我们提供了 NULLS,其中另一个管道(您的源数据)中缺少 id,因此我们可以使用FilterNotNullFilterNull在下面的逻辑中为我们获取最终管道,然后我们相应地连接到 Tap trxRemoved 和 Tap trxAdded。

trx 改变

在这里,我将首先连接您正在寻找使用更改的字段,FieldJoiner然后使用 anExpressionFilter为我们提供僵尸(因为它们已更改),例如:

Pipe valueChange = new Pipe("changed");
valueChange = new Pipe(oldNnew, new Fields("oldValues", "newValues"), 
            new ExpressionFilter("oldValues.equals(newValues)", String.class),
            Fields.All);

它的作用是过滤掉具有相同值的字段并保留差异。此外,如果上面的表达式为真,它将删除该记录。最后,将您的 valueChange 管道连接到 Tap trxChanged,您将拥有三个输出,其中包含您正在寻找的所有数据,其中包含允许一些附加分析的代码。

于 2013-05-04T20:56:28.400 回答