2

我想在 Spark 中进行简单的机器学习。

首先,应用程序应该从文件中的历史数据中进行一些学习,训练机器学习模型,然后从 kafka 读取输入以实时给出预测。为此,我相信我应该使用火花流。但是,恐怕我并不真正了解火花流的工作原理。

代码如下所示:

def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test App")
    val sc = new SparkContext(conf)
    val fromFile = parse(sc, Source.fromFile("my_data_.csv").getLines.toArray)
    ML.train(fromFile)

    real_time(sc)
}

ML 是一个包含一些机器学习内容的类,train 给它提供数据进行训练。还有一种方法classify可以根据所学知识计算预测。第一部分似乎工作正常,但real_time有一个问题:

def real_time(sc: SparkContext) : Unit = {
    val ssc = new StreamingContext(new SparkConf(), Seconds(1))
    val topic = "my_topic".split(",").toSet
    val params = Map[String, String](("metadata.broker.list", "localhost:9092"))
    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic)

    var lin = dstream.map(_._2)
    val str_arr = new Array[String](0)
    lin.foreach {
        str_arr :+ _.collect()
    }
    val lines = parse(sc, str_arr).map(i => i.features)

    ML.classify(lines)
    ssc.start()
    ssc.awaitTermination()
}

我想做的是检查 Kafka 流并计算它是否有任何新行。好像不是这样的,我加了一些打印,没有打印。

如何使用火花流,在我的情况下应该如何使用?

4

0 回答 0