7

我不确定这是否是一个错误,所以如果你做这样的事情

// d:spark.RDD[String]
d.distinct().map(x => d.filter(_.equals(x)))

您将获得一个 Java NPE。但是,如果您collect立即执行distinct,一切都会好起来的。

我正在使用火花0.6.1。

4

2 回答 2

13

Spark 不支持嵌套 RDD 或引用其他 RDD 的用户定义函数,因此会出现 NullPointerException;在邮件列表上看到这个线程spark-users

看起来您当前的代码正在尝试d按值对元素进行分组;您可以使用groupBy() RDD方法有效地做到这一点:

scala> val d = sc.parallelize(Seq("Hello", "World", "Hello"))
d: spark.RDD[java.lang.String] = spark.ParallelCollection@55c0c66a

scala> d.groupBy(x => x).collect()
res6: Array[(java.lang.String, Seq[java.lang.String])] = Array((World,ArrayBuffer(World)), (Hello,ArrayBuffer(Hello, Hello)))
于 2013-01-02T22:52:12.307 回答
0

Spark 1.3.0 流编程指南中提供的窗口示例怎么样

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

SPARK-5063 导致示例失败,因为连接是从 RDD 上的转换方法中调用的

于 2015-04-10T22:06:23.660 回答