1

我想用一些额外的过滤条件用小 RDD 迭代一个 BIG RDD。下面的代码工作正常,但该过程仅在 Driver 和 Not spread-ed 跨节点的情况下运行。所以请建议任何其他方法?

val cross = titlesRDD.cartesian(brRDD).cache()
 val matching = cross.filter{ case( x, br) => 
    ((br._1 == "0") && 
   (((br._2 ==((x._4))) &&
    ((br._3 exists (x._5)) || ((br._3).head=="")) 
}

谢谢,马杜

4

1 回答 1

3

您可能不想缓存cross. 我相信,不缓存它会让笛卡尔积根据过滤器的需要“即时”发生,而不是在内存中实例化由笛卡尔积产生的潜在大量组合。

此外,您可以brRDD.filter(_._1 == "0")在使用 进行笛卡尔积之前进行操作titlesRDD,例如

val cross = titlesRDD.cartesian(brRRD.filter(_._1 == "0"))

然后matching适当地修改用于创建的过滤器。

于 2015-09-27T20:52:49.720 回答