1

我正在尝试构建一个 Spark 应用程序,它将根据我拥有的单词列表过滤 Twitter 流媒体提要。我的列表中有大约 8000 个单词(Twitter 过滤 API 最多支持 400 个单词)。我想将每条传入的推文标记为单词,然后检查这个单词是否存在于我的列表中。如果推文中的任何单词在列表中,那么我应该打印推文,否则拒绝它。

我编写了以下代码来实现这一点(我在spark-shell上一次复制粘贴以下代码几行,这是测试/运行我的代码的正确方法吗?):

// excluding imports to keep in concise 
val consumerKey = "" // removed while posting on SOF
val consumerSecret = "" // removed while posting on SOF
val accessToken = "" // removed while posting on SOF
val accessTokenSecret = "" // removed while posting on SOF
val url = "https://stream.twitter.com/1.1/statuses/filter.json"

val sparkConf = new SparkConf().setAppName("Twitter Sentiment Analysis")
val sc = new SparkContext(sparkConf)


val csvFilterWordsList = sc.textFile("<path to file>/uniq_list_8.0_sorted")
var filterWordsList : Set[String] = Set()
for(filterWords <- csvFilterWordsList.collect()) {
    filterWordsList += filterWords.split(",")(0)
}



// Twitter Streaming
val ssc = new JavaStreamingContext(sc,Seconds(2))

val conf = new ConfigurationBuilder()
conf.setOAuthAccessToken(accessToken)
conf.setOAuthAccessTokenSecret(accessTokenSecret)
conf.setOAuthConsumerKey(consumerKey)
conf.setOAuthConsumerSecret(consumerSecret)
conf.setStreamBaseURL(url)
conf.setSiteStreamBaseURL(url)

val filter = Array("twitter")

val auth = AuthorizationFactory.getInstance(conf.build())
val tweets : JavaReceiverInputDStream[twitter4j.Status] = TwitterUtils.createStream(ssc, auth, filter)

object test extends Serializable {
def similarity(tweet : twitter4j.Status, wordsList : Set[String]) : String = {
    val tweetTokenized = tweet.getText.replaceAll("[^a-zA-Z0-9]", " ").split(" ")
    var flag = false
    for(word <- tweetTokenized) {
        if(wordsList.contains(word)) {
            flag = true
        }
    }
    if(flag && tweet.getUser.getLang == "en") {
     return (tweet.getText + "," + tweet.getUser.getLang)
    }
    else {
     return ""
    }
}
}
val statuses = tweets.dstream.map(status => test.similarity(status,filterWordsList))

statuses.print()
ssc.start()

但是在运行这个我得到以下异常:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:436)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
    at $iwC$$iwC$$iwC.<init>(<console>:80)
    at $iwC$$iwC.<init>(<console>:82)
    at $iwC.<init>(<console>:84)
    at <init>(<console>:86)
    at .<init>(<console>:90)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: twitter4j.conf.ConfigurationBuilder
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

但是当我只是做一个简单的映射时,
val statuses = tweets.dstream.map(status => status.getText)
它工作得很好。

有人可以帮我解决我做错了什么吗?

4

1 回答 1

4

Spark-shell 将代码封装在匿名类中,以将代码序列化并发送给工作人员。有时很难知道正在捕获什么以及在哪个范围内。如果您在 spark-shell 中复制/粘贴代码,即使您粘贴在一起的行的顺序和数量(:paste例如)也会导致不同的类结构。

避免序列化问题的经验法则是标记@transient所有您知道在 dstream 操作中不需要的元素。在这种特定情况下,我会将瞬态注释添加到conf,authtweets.

于 2014-12-01T17:43:40.300 回答