我在 hbase 中有以下格式的日志数据。
hbase源表
---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1
dest 表(在目标表中,在 2013/09/25 运行 mapreduce 后,添加 word 作为 key 和 count 的总和作为 column.data)
------------------
word(table key) count
------------------
apple 7
oranges 6
mangoes 6
数据将每天添加到源表中。但我不想对所有源表数据进行 map reduce。所以我试着只为那天添加的数据做 map reduce。
2013 年 9 月 26 日添加了新数据的源表。
---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1
2013/09/26 apple 10
2013/09/26 oranges 20
当我只为 2013/09/26 数据做 mapreduce 时,我在 dest 表中得到以下内容。
包含新数据的 dest 表(由于键相同,因此苹果和橙子的计数将使用 2013/09/26 数据进行更新。截至 2013/09/25 的旧数据已消失):
------------------
word(table key) count
------------------
apple 10
oranges 10
mangoes 6
预期的目标表:
------------------
word(table key) count
------------------
apple 17
oranges 16
mangoes 6
我可以映射减少部分数据并将计数添加到目标表计数列还是每次都需要映射减少所有数据?
如果我可以映射减少部分数据并更新计数,我该怎么做。这是我的地图减少功能。
地图功能:
public void map(ImmutableBytesWritable row,Result value,Context context) throws IOException {
ImmutableBytesWritable key = new ImmutableBytesWritable(row.get());
String cf = "data";
String column1 = "word";
String column2 = "count";
String word = new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column1)));
Text t = new Text(word);
context.write(t,value);
}
减少功能:
public void reduce(Text key,Iterable<Result> values,Context context) throws IOException,InterruptedException {
int count=0;
String cf = "data";
String column = "count";
for(Result val :values) {
int d = Integer.parseInt(new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column))))
count += d;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(cf.getBytes(), column.getBytes(), String.valueOf(count).getBytes());
context.write(null, put);
}