HBase 需要从一个集群中导出数据并导入到另一个集群中,对 row key 稍作修改
正如我在上面的帖子中提到的,需要从一个集群中导出表的 HBase 数据并通过根据我们的匹配模式更改行键将其导入另一个集群
在“org.apache.hadoop.hbase.mapreduce.Import”中,我们可以选择使用参数“HBASE_IMPORTER_RENAME_CFS”更改 ColumnFamily
我稍微修改了导入代码以支持行键更改。我的代码在 Pastebin https://pastebin.com/ticgeBb0中可用
使用以下代码更改了行键。
private static Cell convertRowKv(Cell kv, Map<byte[], byte[]> rowkeyReplaceMap) {
if (rowkeyReplaceMap != null) {
byte[] oldrowkeyName = CellUtil.cloneRow(kv);
String oldrowkey = Bytes.toString(oldrowkeyName);
Set<byte[]> keys = rowkeyReplaceMap.keySet();
for (byte[] key : keys) {
if (oldrowkey.contains(Bytes.toString(key))) {
byte[] newrowkeyName = rowkeyReplaceMap.get(key);
ByteBuffer buffer = ByteBuffer.wrap(oldrowkeyName);
buffer.get(key);
ByteBuffer newbuffer = buffer.slice();
ByteBuffer bb = ByteBuffer.allocate(newrowkeyName.length + newbuffer.capacity());
byte[] newrowkey = bb.array();
kv = new KeyValue(newrowkey, // row buffer
0, // row offset
newrowkey.length, // row length
kv.getFamilyArray(), // CF buffer
kv.getFamilyOffset(), // CF offset
kv.getFamilyLength(), // CF length
kv.getQualifierArray(), // qualifier buffer
kv.getQualifierOffset(), // qualifier offset
kv.getQualifierLength(), // qualifier length
kv.getTimestamp(), // timestamp
KeyValue.Type.codeToType(kv.getTypeByte()), // KV
// Type
kv.getValueArray(), // value buffer
kv.getValueOffset(), // value offset
kv.getValueLength()); // value length
}
}
}
return kv;
}
执行导入
hbase org.apache.hadoop.hbase.mapreduce.ImportWithRowKeyChange -DHBASE_IMPORTER_RENAME_ROW=123:123456 import file:///home/nshsh/export/
行键已成功更改。但是在将 Cell 放入 HBase 表时,使用“ org.apache.hadoop.hbase.client.Put.add(Cell) ”我们检查为
“ kv 的行与 put 相同,因为我们正在更改行键”
在这里它失败了。
然后我评论了 Put 类中的检查并更新了 hbase-client.jar。我也尝试编写扩展 Put 的 HBasePut
public class HBasePut extends Put {
public HBasePut(byte[] row) {
super(row);
// TODO Auto-generated constructor stub
}
public Put add(Cell kv) throws IOException{
byte [] family = CellUtil.cloneFamily(kv);
System.err.print(Bytes.toString(family));
List<Cell> list = getCellList(family);
//Checking that the row of the kv is the same as the put
/*int res = Bytes.compareTo(this.row, 0, row.length,
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
if (res != 0) {
throw new WrongRowIOException("The row in " + kv.toString() +
" doesn't match the original one " + Bytes.toStringBinary(this.row));
}*/
list.add(kv);
familyMap.put(family, list);
return this;
}
}
在 Mapreduce 中,任务总是失败并出现以下异常
2020-07-24 13:37:15,105 WARN [htable-pool1-t1] hbase.HBaseConfiguration: Config option "hbase.regionserver.lease.period" is deprecated. Instead, use "hbase.client.scanner.timeout.period"
2020-07-24 13:37:15,122 INFO [LocalJobRunner Map Task Executor #0] client.AsyncProcess: , tableName=import
2020-07-24 13:37:15,178 INFO [htable-pool1-t1] client.AsyncProcess: #2, table=import, attempt=18/35 failed=7ops, last exception: org.apache.hadoop.hbase.client.WrongRowIOException: org.apache.hadoop.hbase.client.WrongRowIOException: The row in \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00/vfrt:con/1589541180643/Put/vlen=225448/seqid=0 doesn't match the original one 123_abcf
at org.apache.hadoop.hbase.client.Put.add(Put.java:330)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toPut(ProtobufUtil.java:574)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:744)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:720)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2168)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33656)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2196)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108)
at java.lang.Thread.run(Thread.java:745)
我不知道任务中提到了旧的 Put 类。
有人可以帮忙解决这个问题。