0

我正在用 MRUnit 测试我的映射器。我将键和值列表作为输入从测试类传递给映射器。问题是 :

String key=1234_abc;
ArrayList<KeyValue> list = new ArrayList<KeyValue>();
KeyValue k1 = new KeyValue(Bytes.toBytes(key),"cf".getBytes(), "Val1".getBytes(),Bytes.toBytes("abc.com"));
KeyValue k2 = new KeyValue(Bytes.toBytes(key), "cf".getBytes(), "Val2".getBytes(),Bytes.toBytes("165"));
Result result = new Result(list);
mapDriver.withInput(key, result); 

问题是在结果对象中只保留了第一个键值。其他的被存储为空。

4

2 回答 2

6

问题是 HBase 按字典顺序存储列。看起来 Result(KeyValue[] kvs) 或 Result(List kvs) 构造函数期望的顺序相同。

这是解决方案!

TreeSet<KeyValue> set = new TreeSet<KeyValue>(KeyValue.COMPARATOR);

byte[] row = Bytes.toBytes("row01");
byte[] cf = Bytes.toBytes("cf");
set.add(new KeyValue(row, cf, "cone".getBytes(), Bytes.toBytes("row01_cone_one")));
set.add(new KeyValue(row, cf, "ctwo".getBytes(), Bytes.toBytes("row01_ctwo_two")));
set.add(new KeyValue(row, cf, "cthree".getBytes(), Bytes.toBytes("row01_cthree_three")));
set.add(new KeyValue(row, cf, "cfour".getBytes(), Bytes.toBytes("row01_cfour_four")));
set.add(new KeyValue(row, cf, "cfive".getBytes(), Bytes.toBytes("row01_cfive_five")));
set.add(new KeyValue(row, cf, "csix".getBytes(), Bytes.toBytes("row01_csix_six")));

KeyValue[] kvs = new KeyValue[set.size()];
set.toArray(kvs);

Result result = new Result(kvs);
mapDriver.withInput(key, result);

希望这会有所帮助!

于 2014-12-19T11:52:17.920 回答
0

I just finished about 6 hours of pain on this issue myself and finally discovered the problem. It appears to be a bug in the org.apache.hadoop.hbase.client.Result class, at least for the version of HBase I am using (0.94.18).

// The below line of code was failing for me when running locally under MRUnit
// but it seemed to succeed when running in production on my cluster.
// org.apache.hadoop.hbase.client.Result result passed in to this method.
Bytes.toString(result.getValue(Constants.CF1, Constants.REG_STATUS_FLAG_BYTES));

result.getValue() calls getColumnLatest() which contains a call to binarySearch(). The binarySearch() method seems to be faulty and returns the wrong index almost always. getColumnLatest() doublechecks that it really did find the right KeyValue by making sure the family and qualifier were a match. They usually are not a matchand it returns null.

I ended up re-implementing the getValue() method and the 3 methods it uses and then swap over to the functionally correct implementation in my unit test. There may be a better way to achieve this, but it is late and this is what I came up with (and it does fix the problem):

// Usage: Pass the Result into the newly created getValue() method, rather than
// calling getValue() on the Result object.
Bytes.toString(getValue(result, Constants.CF1, Constants.REG_STATUS_FLAG_BYTES));

// Reimplemented Methods:
private byte[] getValue(Result result, byte [] family, byte [] qualifier) {
  KeyValue kv = getColumnLatest(result, family, qualifier);
  if (kv == null) {
    return null;
  }
  return kv.getValue();
}

private KeyValue getColumnLatest(Result result,  byte[] family, byte[] qualifier) {    
  KeyValue [] kvs = result.raw(); // side effect possibly.
  if (kvs == null || kvs.length == 0) {
    return null;
  }
  //int pos = binarySearch(kvs, family, qualifier);
  int pos = linearSearch(kvs, family, qualifier);
  if (pos == -1) {
    return null;
  }
  KeyValue kv = kvs[pos];
  if (kv.matchingColumn(family, qualifier)) {
    return kv;
  }
  return null;
}

private int linearSearch(final KeyValue [] kvs, final byte [] family,
  final byte [] qualifier) {

  int pos = -1;
  int index = 0;
  for (KeyValue kv : kvs) {
    if (byteArraysEqual(family, kv.getFamily()) && byteArraysEqual(qualifier, kv.getQualifier())) {
      pos = index;
      break;
    }
    index++;
  }
  return pos;
}

private boolean byteArraysEqual(final byte[] ba1, final byte[] ba2) {    
  if (ba1 == null || ba2 == null) {
    return false;
  }

  if (ba1.length != ba2.length) {
    return false;
  }

  for (int i = 0; i < ba1.length; i++) {
    if (ba1[i] != ba2[i]) {
      return false;
    }
  }

  return true;
}
于 2014-12-11T10:35:53.387 回答