很抱歉在 hadoop 用户邮件列表和此处交叉发布此内容,但这对我来说是个紧急问题。
我的问题如下:我有两个输入文件,我想确定
- a) 仅出现在文件 1 中的行数
- b) 仅在文件 2 中出现的行数
- c) 两者共有的行数(例如关于字符串相等性)
例子:
File 1:
a
b
c
File 2:
a
d
每种情况的所需输出:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
基本上我的方法如下:我编写了自己的 LineRecordReader,以便映射器接收由行(文本)和指示源文件(0 或 1)的字节组成的对。映射器仅再次返回该对,因此实际上它什么也不做。然而,副作用是,组合器收到一个
Map<Line, Iterable<SourceId>>
(其中 SourceId 为 0 或 1)。
现在,对于每一行,我可以获得它出现的源集。因此,我可以编写一个组合器,计算每种情况(a、b、c)的行数(清单 1)
然后,组合器仅在清理时输出“摘要”(安全吗?)。所以这个总结看起来像:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
然后,在减速器中,我只总结这些摘要的值。(因此reducer 的输出看起来就像combiner 的输出)。
但是,主要问题是,我需要将两个源文件视为一个单独的虚拟文件,该文件会产生格式为 (line, sourceId) // sourceId 0 或 1 的记录
我不确定如何实现这一目标。所以问题是我是否可以避免预先处理和合并文件,并使用虚拟合并文件阅读器和自定义记录阅读器之类的东西即时进行。非常感谢任何代码示例。
最好的问候,克劳斯
清单 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}