1

我正在尝试官方指南末尾的 Streaming Clustering 示例代码,但出现类型错误。这是我的代码:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans

object Kmeans {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("kmeans")
    val ssc = new StreamingContext(conf, Seconds(3))

    val trainingData = ssc.textFileStream("training").map(Vectors.parse)
    val testData = ssc.textFileStream("test").map(LabeledPoint.parse)

    val numDimensions = 3
    val numClusters = 2
    val model = new StreamingKMeans()
      .setK(numClusters)
      .setDecayFactor(1.0)
      .setRandomCenters(numDimensions, 0.0)

    model.trainOn(trainingData)
    model.predictOnValues(testData).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

但是当我跑步时

sbt package

我收到以下错误:

[error]  found   : org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint]
[error]  required: org.apache.spark.streaming.dstream.DStream[(?, org.apache.spark.mllib.linalg.Vector)]
[error]     model.predictOnValues(testData).print()
[error]                           ^
[error] one error found
[error] (compile:compile) Compilation failed
4

1 回答 1

2

您需要映射testData: DStream[LabeledPoint]DStream[(K, Vector)]

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

你可以在这里找到完整的例子:StreamingKMeansExample.scala

于 2015-02-13T16:09:05.230 回答