3

我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流为空(if(!rdd.partitions.isEmpty)),我已经包含了一个捕获;但是,即使没有针对 kafka 主题发布任何事件,else也从未达到过声明。

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

stream.foreachRDD { rdd =>
    if(!rdd.partitions.isEmpty) {

        val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)

        val val = message(0)

    } else println("empty stream...")

    ssc.start() 
    ssc.awaitTermination()

}

是否有替代语句我应该使用KafkaUtils.createDirectStream而不是检查流是否为空createStream

4

1 回答 1

4

使用RDD.isEmpty而不是RDD.partitions.isEmpty添加检查底层分区是否实际具有元素:

stream.foreachRDD { rdd =>
  if(!rdd.isEmpty) {
    // Stuff
  }
}

不起作用的原因RDD.partitions.isEmpty是 内部存在一个分区RDD,但该分区本身是空的。但从partitionswhich is an的角度来看Array[Partition],它不是空的。

于 2016-11-02T17:09:18.090 回答