我想将 DStream 中的每个 RDD 与非流式、不变的参考文件一起加入。这是我的代码:
val sparkConf = new SparkConf().setAppName("LogCounter")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val sc = new SparkContext()
val geoData = sc.textFile("data/geoRegion.csv")
.map(_.split(','))
.map(line => (line(0), (line(1),line(2),line(3),line(4))))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details removed for brevity
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData))
我收到很多很多错误,最常见的是:
14/11/19 19:58:23 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
java.io.FileNotFoundException: http://10.102.71.92:40764/broadcast_1
我认为我应该广播 geoData 而不是在每个任务中读取它(它是一个 100MB 的文件),但我不确定将第一次初始化 geoData 的代码放在哪里。
另外我不确定 geoData 是否定义正确(也许它应该使用 ssc 而不是 sc?)。我看到的文档只列出了转换和连接,但没有显示静态文件是如何创建的。
关于如何广播 geoData 然后将其加入每个流式 RDD 的任何想法?