9

如何RDD[Array[Byte]]使用 Apache Spark 写入文件并再次读取?

4

2 回答 2

14

常见的问题似乎是一个奇怪的无法将异常从 BytesWritable 转换为 NullWritable。另一个常见问题是 BytesWritablegetBytes是一堆毫无意义的废话,根本没有得到字节。什么getBytes是得到你的字节而不是在最后添加大量的零!你必须使用copyBytes

val rdd: RDD[Array[Byte]] = ???

// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
  .saveAsSequenceFile("/output/path", codecOpt)

// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFile[NullWritable, BytesWritable]("/input/path")
  .map(_._2.copyBytes())
于 2014-06-06T13:42:55.510 回答
0

这是一个包含所有必需导入的片段,您可以根据@Choix 的要求从 spark-shell 运行它们

import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable

val path = "/tmp/path"

val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str  =>  (NullWritable.get, new BytesWritable(str.getBytes) )  }
bytesRdd.saveAsSequenceFile(path)

val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is:  Array[String] = Array(foo)
于 2019-07-27T22:17:00.377 回答