我想在 Spark 中进行简单的机器学习。
首先,应用程序应该从文件中的历史数据中进行一些学习,训练机器学习模型,然后从 kafka 读取输入以实时给出预测。为此,我相信我应该使用火花流。但是,恐怕我并不真正了解火花流的工作原理。
代码如下所示:
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test App")
val sc = new SparkContext(conf)
val fromFile = parse(sc, Source.fromFile("my_data_.csv").getLines.toArray)
ML.train(fromFile)
real_time(sc)
}
ML 是一个包含一些机器学习内容的类,train 给它提供数据进行训练。还有一种方法classify
可以根据所学知识计算预测。第一部分似乎工作正常,但real_time
有一个问题:
def real_time(sc: SparkContext) : Unit = {
val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val topic = "my_topic".split(",").toSet
val params = Map[String, String](("metadata.broker.list", "localhost:9092"))
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic)
var lin = dstream.map(_._2)
val str_arr = new Array[String](0)
lin.foreach {
str_arr :+ _.collect()
}
val lines = parse(sc, str_arr).map(i => i.features)
ML.classify(lines)
ssc.start()
ssc.awaitTermination()
}
我想做的是检查 Kafka 流并计算它是否有任何新行。好像不是这样的,我加了一些打印,没有打印。
如何使用火花流,在我的情况下应该如何使用?