-1

请看下面的代码片段:

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.Seconds

abstract class MQTTDStream[T <: Any](ssc: StreamingContext) extends DStream(ssc) {
  override def compute(validTime: Time): Option[RDD[T]] =
Some(ssc.sparkContext.parallelize(Seq(1, 2, 3), 1)) //This line doesn't compile

  override def dependencies = Nil

  override def slideDuration = Seconds(1) // just an example
}

我收到以下错误:

类型不匹配; 找到:需要 Int(1):T

我已经声明 T 扩展 Any,那么为什么编译器会抱怨呢?Int 是 Any 的子类型,不是吗?

非常感谢!

更新:2.9.16:

更改为从 DStream[Int] 扩展,但仍然出现相同的错误:

abstract class MQTTDStream[T](ssc: StreamingContext) extends DStream[Int](ssc) {
  override def compute(validTime: Time): Option[RDD[T]] =
Some(ssc.sparkContext.parallelize(Seq(1, 2, 3), 1)) //This line doesn't compile

  override def dependencies = Nil

  override def slideDuration = Seconds(1) // just an example
}

编辑:2.9.16:

感谢 Alexey,这是可行的解决方案:

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.Seconds

abstract class MQTTDStream[T](ssc: StreamingContext) extends DStream[Int](ssc) {
  override def compute(validTime: Time): Option[RDD[Int]] =
    Some(ssc.sparkContext.parallelize(Seq(1, 2, 3), 1))

  override def dependencies = Nil

  override def slideDuration = Seconds(1) // just an example
}
4

1 回答 1

2

来电者可以选择T,而不是您。因此,您的类定义必须适用于所有人T(满足类型界限,但都是T的子类型Any)。

也就是说,如果有人创建例如 a MQTTDStream[String],那么它的compute方法必须返回一个Option[RDD[String]]。但它没有:它返回Some[RDD[Int]].

于 2016-08-31T20:35:02.123 回答