0

在使用 Twitter 实用程序加载 jar 后,我正在 spark-shell 中进行一些测试。这是一个有效的代码序列:

// launch:
// spark-shell --driver-memory 1g --master local[3] --jars target/scala-2.10/tweetProcessing-1.0.jar

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._

val consumerKey = ...
val consumerSecret = ...
val accessToken = ...
val accessTokenSecret = ...
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

val ssc = new StreamingContext(sc, Seconds(60))
val tweetStream = TwitterUtils.createStream(ssc, None)
val myNewStream = tweetStream.map(tweet => tweet.getText)
    .map(tweetText => tweetText.toLowerCase.split("\\W+"))
    .transform(rdd => 
        rdd.map(tweetWordSeq => { 
            tweetWordSeq.foreach { word => { 
                val mySet = Set("apple", "orange");
                if(!(mySet)(word)) word }
            }
        }))
myNewStream.foreachRDD((rdd,time) => { 
    println("%s at time %s".format(rdd.count(),time.milliseconds))
})
ssc.start()

(实际上我最大程度地减少了我所做的计算,只是为了突出问题)。这里 mySet 被序列化了,一切顺利。

但是当我使用广播变量并相应地替换测试时:

val ssc = new StreamingContext(sc, Seconds(60))

val mySet = sc.broadcast(Set("apple", "orange"))

val tweetStream = TwitterUtils.createStream(ssc, None)
val myNewStream = tweetStream.map(tweet => tweet.getText)
    .map(tweetText => tweetText.toLowerCase.split("\\W+"))
    .transform(rdd => 
        rdd.map(tweetWordSeq => { 
            tweetWordSeq.foreach { word => { 
                if(!(mySet.value)(word)) word }
            }
        }))
myNewStream.foreachRDD((rdd,time) => { 
    println("%s at time %s".format(rdd.count(),time.milliseconds))
})
ssc.start()

我得到:

ERROR JobScheduler: Error generating jobs for time 1464335160000 ms
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.TransformedDStream is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.

我自然更喜欢使用广播变量(我的集合实际上是一组相当大的停用词),但我不太明白问题出在哪里。

4

1 回答 1

0

您需要在驱动程序中创建广播变量,(在任何闭包之外)而不是在任何转换transform中,如foreachRDD等。

val ssc = new StreamingContext(sc, Seconds(60))
val mySet = ssc.sparkContext.broadcast(Set("apple", "orange"))

然后,您可以访问transform执行器上的或其他 DStream 闭包中的广播变量,例如,

!(mySet.value)(word)

如果您在闭包中包含此语句sc.broadcast(Set("apple", "orange"))rdd.map驱动transform程序将尝试将 StreamingContext 发送到所有执行程序,并且它是不可序列化的。这就是为什么你看到NotSerializableException

于 2016-05-27T11:42:26.383 回答