我是 Apache Spark 的初学者。我想过滤掉所有权重总和大于 RDD 中的常数值的组。“权重”映射也是一个 RDD。这里是一个小型的demo,需要过滤的组存储在“groups”中,常量值为12:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
当输入数据非常大时,例如> 10GB,我总是遇到“java heap out of memory”错误。我怀疑它是否是由“weights.toArray.toMap”引起的,因为它将分布式 RDD 转换为 JVM 中的 Java 对象。所以我尝试直接用RDD过滤:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
当我result.collect
将此脚本加载到 spark shell 后运行时,出现“java.lang.NullPointerException”错误。有人告诉我,当一个 RDD 在另一个 RDD 中被操作时,会出现空指针异常,并建议我将权重放到 Redis 中。
那么如何在不将“权重”转换为 Map 或将其放入 Redis 的情况下获得“结果”?如果没有外部数据存储服务的帮助,是否有一种解决方案可以基于另一个类似地图的 RDD 过滤 RDD?谢谢!