0
JavaRDD<String> hbaseFile = jsc.textFile(HDFS_MASTER+HBASE_FILE);
JavaPairRDD<ImmutableBytesWritable, KeyValue> putJavaRDD = hbaseFile.mapToPair(line -> convertToKVCol1(line, COLUMN_AGE));
putJavaRDD.sortByKey(true);
putJavaRDD.saveAsNewAPIHadoopFile(stagingFolder, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);

private static Tuple2<ImmutableBytesWritable, KeyValue> convertToKVCol1(String beanString, byte[] column) {
    InspurUserEntity inspurUserEntity = gson.fromJson(beanString, InspurUserEntity.class);
    String rowKey = inspurUserEntity.getDepartment_level1()+"_"+inspurUserEntity.getDepartment_level2()+"_"+inspurUserEntity.getId();
    return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(rowKey)),
            new KeyValue(Bytes.toBytes(rowKey), COLUMN_FAMILY, column, Bytes.toBytes(inspurUserEntity.getAge())));
}

以上是我的代码,它仅适用于行键的单列。有什么想法可以为一个行键创建一个包含多列的 HFile?

4

2 回答 2

0

您必须在声明中使用数组而不是 ImmutableBytesWritable。

于 2017-09-22T07:00:07.740 回答
0

Tuple2<ImmutableBytesWritable, KeyValue>您可以为一行创建多个,其中键保持不变, KeyValues 表示单个单元格值。确保也按字典顺序对列进行排序。所以你应该调用saveAsNewAPIHadoopFile一个JavaPairRDD<ImmutableBytesWritable, KeyValue>.

    final JavaPairRDD<ImmutableBytesWritable, KeyValue> writables = myRdd.flatMapToPair(record -> {
     final List<Tuple2<ImmutableBytesWritable, KeyValue>> listToReturn = new ArrayList<>();
     // Add first column to the collection
     listToReturn.add(new Tuple2<ImmutableBytesWritable, KeyValue>(
                            new ImmutableBytesWritable(Bytes.toBytes(record.getRowKey())),
                            new KeyValue(Bytes.toBytes(record.getRowKey()), Bytes.toBytes("CF"),
                                    Bytes.toBytes("COL1"), System.currentTimeMillis(),
                                    Bytes.toBytes(record.getCol1()))));
    // Add subsequent columns
    listToReturn.add(new Tuple2<ImmutableBytesWritable, KeyValue>(
                            new ImmutableBytesWritable(Bytes.toBytes(record.getRowKey())),
                            new KeyValue(Bytes.toBytes(record.getRowKey()), Bytes.toBytes("CF"),
                                    Bytes.toBytes("COL2"), System.currentTimeMillis(),
                                    Bytes.toBytes(record.getCol2()))));
});

注意:这是一个主要问题,您还必须按字典顺序将列添加到 RDD。

基本上这种组合:行键 + 列族 + 列限定符应该在您继续推出 HFile 之前进行排序。

于 2018-11-09T07:31:02.770 回答