如何RDD[Array[Byte]]
使用 Apache Spark 写入文件并再次读取?
问问题
12059 次
2 回答
14
常见的问题似乎是一个奇怪的无法将异常从 BytesWritable 转换为 NullWritable。另一个常见问题是 BytesWritablegetBytes
是一堆毫无意义的废话,根本没有得到字节。什么getBytes
是得到你的字节而不是在最后添加大量的零!你必须使用copyBytes
val rdd: RDD[Array[Byte]] = ???
// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
.saveAsSequenceFile("/output/path", codecOpt)
// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFile[NullWritable, BytesWritable]("/input/path")
.map(_._2.copyBytes())
于 2014-06-06T13:42:55.510 回答
0
这是一个包含所有必需导入的片段,您可以根据@Choix 的要求从 spark-shell 运行它们
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
val path = "/tmp/path"
val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str => (NullWritable.get, new BytesWritable(str.getBytes) ) }
bytesRdd.saveAsSequenceFile(path)
val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is: Array[String] = Array(foo)
于 2019-07-27T22:17:00.377 回答