0

HBase 需要从一个集群中导出数据并导入到另一个集群中,对 row key 稍作修改

正如我在上面的帖子中提到的,需要从一个集群中导出表的 HBase 数据并通过根据我们的匹配模式更改行键将其导入另一个集群

在“org.apache.hadoop.hbase.mapreduce.Import”中,我们可以选择使用参数“HBASE_IMPORTER_RENAME_CFS”更改 ColumnFamily

我稍微修改了导入代码以支持行键更改。我的代码在 Pastebin https://pastebin.com/ticgeBb0中可用

使用以下代码更改了行键。

private static Cell convertRowKv(Cell kv, Map<byte[], byte[]> rowkeyReplaceMap) {
        if (rowkeyReplaceMap != null) {
            byte[] oldrowkeyName = CellUtil.cloneRow(kv);
            String oldrowkey = Bytes.toString(oldrowkeyName);
            Set<byte[]> keys = rowkeyReplaceMap.keySet();
            for (byte[] key : keys) {
                if (oldrowkey.contains(Bytes.toString(key))) {
                    byte[] newrowkeyName = rowkeyReplaceMap.get(key);
                    ByteBuffer buffer = ByteBuffer.wrap(oldrowkeyName);
                    buffer.get(key);
                    ByteBuffer newbuffer = buffer.slice();
                    ByteBuffer bb = ByteBuffer.allocate(newrowkeyName.length + newbuffer.capacity());
                    byte[] newrowkey = bb.array();
                    kv = new KeyValue(newrowkey, // row buffer
                            0, // row offset
                            newrowkey.length, // row length
                            kv.getFamilyArray(), // CF buffer
                            kv.getFamilyOffset(), // CF offset
                            kv.getFamilyLength(), // CF length
                            kv.getQualifierArray(), // qualifier buffer
                            kv.getQualifierOffset(), // qualifier offset
                            kv.getQualifierLength(), // qualifier length
                            kv.getTimestamp(), // timestamp
                            KeyValue.Type.codeToType(kv.getTypeByte()), // KV
                                                                        // Type
                            kv.getValueArray(), // value buffer
                            kv.getValueOffset(), // value offset
                            kv.getValueLength()); // value length
                }
            }
        }
        return kv;
    }

执行导入

hbase org.apache.hadoop.hbase.mapreduce.ImportWithRowKeyChange -DHBASE_IMPORTER_RENAME_ROW=123:123456 import file:///home/nshsh/export/

行键已成功更改。但是在将 Cell 放入 HBase 表时,使用“ org.apache.hadoop.hbase.client.Put.add(Cell) ”我们检查为

kv 的行与 put 相同,因为我们正在更改行键

在这里它失败了。

然后我评论了 Put 类中的检查并更新了 hbase-client.jar。我也尝试编写扩展 Put 的 HBasePut

public class HBasePut extends Put {

    public HBasePut(byte[] row) {
        super(row);
        // TODO Auto-generated constructor stub
    }
    
    
    public Put add(Cell kv) throws IOException{
        byte [] family = CellUtil.cloneFamily(kv);
       System.err.print(Bytes.toString(family));
        List<Cell> list = getCellList(family);
        //Checking that the row of the kv is the same as the put
        /*int res = Bytes.compareTo(this.row, 0, row.length,
            kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
        if (res != 0) {
          throw new WrongRowIOException("The row in " + kv.toString() +
            " doesn't match the original one " +  Bytes.toStringBinary(this.row));
        }*/
        list.add(kv);
        familyMap.put(family, list);
        return this;
      }

}

在 Mapreduce 中,任务总是失败并出现以下异常

2020-07-24 13:37:15,105 WARN  [htable-pool1-t1] hbase.HBaseConfiguration: Config option "hbase.regionserver.lease.period" is deprecated. Instead, use "hbase.client.scanner.timeout.period"
2020-07-24 13:37:15,122 INFO  [LocalJobRunner Map Task Executor #0] client.AsyncProcess: , tableName=import
2020-07-24 13:37:15,178 INFO  [htable-pool1-t1] client.AsyncProcess: #2, table=import, attempt=18/35 failed=7ops, last exception: org.apache.hadoop.hbase.client.WrongRowIOException: org.apache.hadoop.hbase.client.WrongRowIOException: The row in \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00/vfrt:con/1589541180643/Put/vlen=225448/seqid=0 doesn't match the original one 123_abcf
    at org.apache.hadoop.hbase.client.Put.add(Put.java:330)
    at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toPut(ProtobufUtil.java:574)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:744)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:720)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2168)
    at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33656)
    at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2196)
    at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
    at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133)
    at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108)
    at java.lang.Thread.run(Thread.java:745)

我不知道任务中提到了旧的 Put 类。

有人可以帮忙解决这个问题。

4

0 回答 0