0

我想从 DStream 中删除前 n 个 RDD。我尝试将以下函数与转换一起使用,但它不起作用(错误 OneForOneStrategy:org.apache.spark.SparkContext java.io.NotSerializableException),我认为它不会实现我删除 RDD 的真正目标因为它会返回空的。

var num = 0
def dropNrdds(myRDD: RDD[(String, Int)], dropNum: Int) : RDD[(String, Int)] = {
    if (num < dropNum) {
        num = num + 1
        return myRDD
    }
    else {
        return sc.makeRDD(Seq())
    }
}
4

2 回答 2

1

该错误是因为您的函数引用了您的var num并且包含的​​类不是Serializable。您的函数将由集群的不同节点调用,因此它所依赖的任何内容都必须是Serializable,并且您不能在函数的不同调用之间共享变量(因为它们可能在不同的集群节点上运行)。

RDD从 a 中删除特定数量的 s似乎很奇怪,因为拆分DStream特定对象的方式DStream几乎是一个实现细节。也许基于时间的slice方法可以做你想做的事?

于 2014-10-28T10:10:03.940 回答
0

您收到错误是因为,我猜您正在调用此函数

foreachRdd

循环,它实际上在执行器节点上执行,如果你想在执行器节点上执行某些代码必须是可序列化的,并且 SparkContext(sc,你在你的 dropNrdds 方法中引用它)不是可序列化的,因此你得到了错误。

并提出您的实际问题。

不确定您的要求,但

您可以为您的 RDD 创建一个 DataFrame 并选择符合您条件的记录。并忽略其余的。

或者

您可以使用过滤器并使用过滤器数据创建一个新的 RDD。

于 2016-08-08T21:09:21.533 回答