全部:
最近,我在Hbase(0.94.17)中写了一个协处理器,A类扩展了BaseEndpointCoprocessor,一个行计数方法来计算一个表的行数。
我遇到了一个问题。
如果我没有在扫描中设置过滤器,我的代码适用于两个表。一个表有 1,000,000 行,另一个有 160,000,000 行。计算更大的桌子大约需要 2 分钟。
但是,如果我在扫描中设置过滤器,它只适用于小表。它会在更大的桌子上抛出异常。org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1@2c88652b,java.io.IOException:java.io.IOException:java.lang.IndexOutOfBoundsException:索引:0,大小:0
相信我,我一遍又一遍地检查我的代码。
所以,要用过滤器计算我的表,我必须编写以下愚蠢的代码,首先,我没有在扫描中设置过滤器,然后,在我得到一条记录后,我编写了一个过滤它的方法。
它适用于两张桌子。
但我不知道为什么。
我尝试阅读 HRegion.java 中的扫描仪源代码,但是我没有得到它。
所以,如果你知道答案,请帮助我。谢谢你。
@Override
public long rowCount(Configuration conf) throws IOException {
// TODO Auto-generated method stub
Scan scan = new Scan();
parseConfiguration(conf);
Filter filter = null;
if (this.mFilterString != null && !mFilterString.equals("")) {
ParseFilter parse = new ParseFilter();
filter = parse.parseFilterString(mFilterString);
// scan.setFilter(filter);
}
scan.setCaching(this.mScanCaching);
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan);
long sum = 0;
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean hasMore = false;
do {
curVals.clear();
hasMore = scanner.next(curVals);
if (filter != null) {
filter.reset();
if (HbaseUtil.filterOneResult(curVals, filter)) {
continue;
}
}
sum++;
} while (hasMore);
} finally {
scanner.close();
}
return sum;
}
以下是我的 hbase util 代码:
public static boolean filterOneResult(List<KeyValue> kvList, Filter filter) {
if (kvList.size() == 0)
return true;
KeyValue kv = kvList.get(0);
if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
return true;
}
for (KeyValue kv2 : kvList) {
if (filter.filterKeyValue(kv2) == Filter.ReturnCode.NEXT_ROW) {
return true;
}
}
filter.filterRow(kvList);
if (filter.filterRow())
return true;
else
return false;
}