我在下面有这段代码:
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Spark").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
val kafkaBrokers = Map("metadata.broker.list" -> "HostName:9092")
val offsetMap = Map(TopicAndPartition("topic_test", 0), 8)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, offsetMap)
var offsetArray = Array[OffsetRange]()
lines.transform {rdd =>
offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
_.message()
}.foreachRDD {rdd =>
/* NEW CODE */
}
ssc.start()
ssc.awaitTermination()
}
}
我在评论下添加了新代码/* NEW CODE */
。我的问题是这些val
行将包含一系列 RDD,它们基本上每 3 秒形成一次 kafka 服务器。然后我使用 map 函数获取消息。
但我对这个foreachRDD
功能的作用有点困惑。这是否会遍历其中的所有内容RDD's
(lines DStream
这是我想要做的)?问题是lift-json
库中的 parse 函数只接受一个字符串,所以我需要遍历所有 rdd 并将该字符串值传递给我试图做的 parse 函数。但是由于某种原因没有打印出来。