5

我正在映射一个 HBase 表,每个 HBase 行生成一个 RDD 元素。但是,有时该行有错误的数据(在解析代码中抛出 NullPointerException),在这种情况下我只想跳过它。

我让我的初始映射器返回 anOption以指示它返回 0 或 1 个元素,然后过滤Some,然后获取包含的值:

// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
  map( tuple => getData(tuple._2) ).
  filter( {case Some(y) => true; case None => false} ).
  map( _.get ).
  // ... more RDD operations with the good data

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  try {
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    Some( ( id, ( List(x),
          // more stuff ...
        ) ) )
  } catch {
    case e: NullPointerException => {
      logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
      None
    }
  }
}

有没有更惯用的更短的方法来做到这一点?我觉得这看起来很乱,无论是在我正在做getData()的舞蹈中。map.filter.map

也许 aflatMap可以工作(在 a 中生成 0 或 1 个项目Seq),但我不希望它使我在 map 函数中创建的元组变平,只是消除空。

4

3 回答 3

8

另一种经常被忽视的方法是使用collect(PartialFunction pf),这意味着在 RDD 中“选择”或“收集”在部分函数中定义的特定元素。

代码如下所示:

val output = myRDD.collect{case Success(tuple) => tuple }

def getData(r: Result):Try[(String, List[X])] = Try {
        val id = Bytes.toString(key, 0, 11)
        val x = Long.MaxValue - Bytes.toLong(key, 11)
        (id, List(x))
}
于 2015-03-17T21:06:28.467 回答
7

如果你改变你getData的返回 ascala.util.Try那么你可以大大简化你的转换。像这样的东西可以工作:

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  val tr = util.Try{
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    ( id, ( List(x)
          // more stuff ...
     ) )
  } 

  tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e))
  tr
}

然后你的转换可以像这样开始:

myRDD.
  flatMap(tuple => getData(tuple._2).toOption)

如果你Try是一个Failure,它将变成一个None通孔toOption,然后作为flatMap逻辑的一部分被删除。那时,您在转换中的下一步将只处理成功的案例,即在getData没有包装的情况下返回的任何底层类型(即 No Option

于 2015-03-17T17:06:57.603 回答
2

如果您可以删除数据,那么您可以使用mapPartitions. 这是一个示例:

import scala.util._
val mixedData = sc.parallelize(List(1,2,3,4,0))
mixedData.mapPartitions(x=>{
  val foo = for(y <- x)
   yield {
    Try(1/y)
  }
  for{goodVals <- foo.partition(_.isSuccess)._1}
   yield goodVals.get
})

如果您想查看错误的值,那么您可以accumulator像以前一样使用或只使用日志。

您的代码将如下所示:

val output = myRDD.
  mapPartitions( tupleIter => getCleanData(tupleIter) )
  // ... more RDD operations with the good data

def getCleanData(iter: Iter[???]) = {
  val triedData = getDataInTry(iter)
  for{goodVals <- triedData.partition(_.isSuccess)._1}
    yield goodVals.get
}

def getDataInTry(iter: Iter[???]) = {
  for(r <- iter) yield {
    Try{
      val key = r._2.getRow
      var id = "(unk)"
      var x = -1L
      id = Bytes.toString(key, 0, 11)
      x = Long.MaxValue - Bytes.toLong(key, 11)
      // ... more code that might throw exceptions
    }
  }
}
于 2015-03-17T17:46:10.967 回答