我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流为空(if(!rdd.partitions.isEmpty)
),我已经包含了一个捕获;但是,即使没有针对 kafka 主题发布任何事件,else
也从未达到过声明。
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
是否有替代语句我应该使用KafkaUtils.createDirectStream
而不是检查流是否为空createStream
?