1

我需要将从 HBASE 读取的 RDD 序列化到 alluxio 内存文件系统中,作为缓存和定期更新它以用于增量 SPARK 计算的方式。

代码是这样的,但遇到标题异常

val inputTableNameEvent = HBaseTables.TABLE_XXX.tableName
val namedeRDDName = "EventAllCached2Update"
val alluxioPath = "alluxio://hadoop1:19998/"
val fileURI = alluxioPath + namedeRDDName
val path:AlluxioURI = new AlluxioURI("/"+namedeRDDName)

val fs:FileSystem = FileSystem.Factory.get()

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, inputTableNameEvent)

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
                classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
                classOf[org.apache.hadoop.hbase.client.Result])
numbers = rdd.count()
println("rdd count: " + numbers)
if( fs.exists(path))
       fs.delete(path)
rdd.saveAsObjectFile(fileURI)

谁能帮助告诉如何将 ImmutableBytesWritable 映射到另一种类型以绕过此问题?此外,地图需要是可恢复的,因为稍后我需要使用 objectFile 来读取这个保存的对象并将其转换为 [(ImmutableBytesWritable, Result)] RDD,以便稍后用于更新和计算。

4

1 回答 1

0

您需要将 rdd 转换为 row 对象。如下所示,然后将其保存到 hdfs。解析后的 RDD 就像现在有数据的任何其他 rdd

val parsedRDD = yourRDD.map(tuple => tuple._2).map(result => (
      Row((result.getRow.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column1".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column2".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column3".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column4".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString)
      )))
于 2017-02-24T06:12:46.703 回答