我有这个简单的 Kafka Stream
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Each Kafka message is a flight
val flights = messages.map(_._2)
flights.foreachRDD( rdd => {
println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
rdd.map { flight => {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
}
})
ssc.start()
ssc.awaitTermination()
Kafka 有消息,Spark Streaming 能够将它们作为 RDD 获取。但是我的代码中的第二个 println 不打印任何内容。我在本地 [2] 模式下运行时查看了驱动程序控制台日志,在纱线客户端模式下运行时检查了纱线日志。
我错过了什么?
以下代码代替 rdd.map,在 spark 驱动程序控制台中可以很好地打印:
for(flight <- rdd.collect().toArray) {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
但我担心这个飞行对象的处理可能发生在火花驱动程序项目中,而不是执行程序中。如果我错了,请纠正我。
谢谢