3

我正在 scala 中编写一个 Spark 应用程序,并且想要处理一个脏输入文件。

// CSV file
val raw_data = sc.textFile(...)

val clean_data = raw_data.map(_.split(delimiter))
  .map( r => (r(0), r(1).toDouble)

当 r(1) 不是数字时,将抛出 NumberFormatException。这发生在丑陋的输入数据中的少数行上。

我终于找到了一种丑陋的方式来完成我所需要的:

import scala.util.control.Exception._

val clean_data = raw_data.map(_.split(delimiter))
  .map( r => (r(0),
        catching(classOf[NumberFormatException]).opt(r(1).toDouble))
  .filter( r => r._2 != None)
  .map( r => (r._1, r._2.get))

这给我留下了两个问题。

1)在地图中简单地删除格式错误的行的最佳方法是什么?

2)如何处理通过捕获创建的选项类型,而无需先显式过滤掉 None ,然后将 .get 函数映射并应用于非 None 选项值?

我尝试应用 .flatMap(identity) 步骤来摆脱 Nones,但得到了预期的:TraversableOnce[?] 异常。

4

1 回答 1

4

在 Sparkcollect(pf:PartialFunction)中,scala 集合的孪生兄弟collect正是为了这个目的而存在的:保留那些在偏函数中定义的集合元素。

val rawData = sc.textFile(...)

val cleanData = rawData.map(_.split(Delimiter))
             .collect{ case Array(x,y) if (Try(y.toDouble).isSuccess) (x,y.toDouble) }

另一个不评估.toDouble两次的选项是使用 flatMap:

val cleanData = rawData.map(_.split(Delimiter))
                       .flatMap(entry => Try(entry.toDouble).toOption)

注意:在 Spark 中有点令人困惑的是,有一个无参数collect方法旨在将数据从 RDD 获取到驱动程序。

于 2014-11-11T23:48:03.533 回答