0

How can I read all files from a directory on HDFS and process it using scalding. For the local filesystem I use the below

import com.twitter.scalding._
import com.twitter.scalding.JsonLine
import java.io._

class ParseJsonJob(args: Args) extends Job(args) {
  val fileList = new File(args("input")).listFiles
  val fields = ('device_guid
                ,'service_name
                ,'event_type
               )

  fileList.map {
    fileName =>
      JsonLine(fileName.toString, fields)
      .read
      .filter ('service_name) { name: String => name == "myservice" }
      .write(Tsv(args("output") + fileName.toString.split("/").last))
  }
}

That won't work with HDFS. Does TextLine or JsonLine read directories in addition to files?

4

2 回答 2

0

导入 com.twitter.scalding._
导入 com.twitter.scalding.JsonLine 导入 java.io._

类 ParseJsonJob(args: Args) 扩展 Job(args) { val fields = ('device_guid,'service_name,'event_type)

    JsonLine(args("input"), fields)
    .read
    .filter ('service_name) { name: String => name == "myservice" }
    .write(Tsv(args("output") )   } }

这对你有用。让我知道它是否没有。

于 2015-03-13T20:17:27.463 回答
0

您将获得一个 Hadoop FileSystem 并使用FileSystem.liststatus原语扫描 HDFS 目录,例如:

...
val hadoopConf= implicitly[Mode] match {
  case Hdfs(_, conf) => conf
}
val fs= FileSystem.get(hadoopConf)
for(status <- fs.listStatus(new Path(args("input")))) {
  JsonLine(status.getPath.toString.toString, fields)
      .read
      .filter ('service_name) { name: String => name == "myservice" }
      .write(Tsv(args("output") + fileName.toString.split("/").last))
 }
于 2015-02-22T17:00:04.143 回答