1

RDD[String]从一个文件中得到一个:

val file = sc.textFile("/path/to/myData.txt")

myData 的格式:

>str1_name
ATCGGKFKKVKKFKRLFFVLFLRL
FDJKALGFJVKRIKFKVKFGKLRL
...
FJDLALLLGL //the last line of str1
>str2_name
ATCGGKFKKVKKFKRLFFVLFLRL
FDJKALGFJVKRIKFKVKFGKLRL
...
FJDLALLLGL //the last line of str2
>str3_name
...

我应该如何将数据从文件转换为结构RDD[(String, String)]?例如,

trancRDD(
(str1_name, ATCGGKFKKVKKFKRLFFVLFLRLFDJKALGFJVKRIKFKVKFGKLRL), 
(str2_name, ATCGGKFKKVKKFKRLFFVLFLRLFDJKALGFJVKRIKFKVKFGKLRL),
...
)
4

1 回答 1

1

如果有定义的记录分隔符,如上面指出的“>”,则可以使用自定义 Hadoop 配置来完成:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val conf = new Configuration
conf.set("textinputformat.record.delimiter", ">")
// genome.txt contains the records provided in the question without the "..."
val dataset = sc.newAPIHadoopFile("./data/genome.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val data = dataset.map(x=>x._2.toString)

让我们看一下数据

data.collect
res11: Array[String] = 
Array("", "str1_name
ATCGGKFKKVKKFKRLFFVLFLRL
FDJKALGFJVKRIKFKVKFGKLRL
FJDLALLLGL 
", "str2_name
ATCGGKFKKVKKFKRLFFVLFLRL
FDJKALGFJVKRIKFKVKFGKLRL
FJDLALLLGL
")

我们可以很容易地用这个字符串制作记录

val records =  data.map{ multiLine => val lines = multiLine.split("\n"); (lines.head, lines.tail)}
records.collect
res14: Array[(String, Array[String])] = Array(("",Array()),
       (str1_name,Array(ATCGGKFKKVKKFKRLFFVLFLRL, FDJKALGFJVKRIKFKVKFGKLRL, FJDLALLLGL)),
       (str2_name,Array(ATCGGKFKKVKKFKRLFFVLFLRL, FDJKALGFJVKRIKFKVKFGKLRL, FJDLALLLGL)))

(使用过滤器取出第一个空记录......为读者练习)

于 2014-11-15T23:01:30.873 回答