3

我在 Spark Scala 中有一些 DStream,我想对其进行排序,然后取前 N 个。问题是,每当我尝试运行它时,我都会得到NotSerializableException异常消息:

这是因为 DStream 对象是从闭包中引用的。

问题是我不知道如何解决它:

这是我的尝试:

package com.badrit.realtime

import java.util.Date

import com.badrit.drivers.UnlimitedSpaceTimeDriver
import com.badrit.model.{CellBuilder, DataReader, Trip}
import com.badrit.utility.Printer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext}

import scala.collection.mutable

object StreamingDriver {
    val appName: String = "HotSpotRealTime"
    val hostName = "localhost"
    val port = 5050
    val constrains = UnlimitedSpaceTimeDriver.constrains;
    var streamingRate = 1;
    var windowSize = 8;
    var slidingInterval = 2;
    val cellBuilder = new CellBuilder(constrains)
    val inputFilePath = "/home/ahmedelgamal/Downloads/green_tripdata_2015-02.csv"

    def prepareTestData(sparkStreamCtx: StreamingContext): InputDStream[Trip] = {

        val sparkCtx = sparkStreamCtx.sparkContext    
        val textFile: RDD[String] = sparkCtx.textFile(inputFilePath)
        val data: RDD[Trip] = new DataReader().getTrips(textFile)
        val groupedData = data.filter(_.pickup.date.before(new Date(2015, 1, 2, 0, 0, 0)))
          .groupBy(trip => trip.pickup.date.getMinutes).sortBy(_._1).map(_._2).collect()

        printf("Grouped Data Count is " + groupedData.length)
        var dataQueue: mutable.Queue[RDD[Trip]] = mutable.Queue.empty;

        groupedData.foreach(trips => dataQueue += sparkCtx.makeRDD(trips.toArray))
        printf("\n\nTest Queue size is " + dataQueue.size)


        groupedData.zipWithIndex.foreach { case (trips: Iterable[Trip], index: Int) => {
            println("Items List " + index)


            val passengers: Array[Int] = trips.map(_.passengers).toArray
            val cnt = passengers.length
            println("Sum is " + passengers.sum)
            println("Cnt is " + cnt)

            val passengersRdd = sparkCtx.parallelize(passengers)
            println("Mean " + passengersRdd.mean())
            println("Stdv" + passengersRdd.stdev())

        }
        }
        sparkStreamCtx.queueStream(dataQueue, true)
    }


    def cellCreator(trip: Trip) = cellBuilder.cellForCarStop(trip.pickup)

    def main(args: Array[String]) {
        if (args.length < 1) {
            streamingRate = 1;
            windowSize = 3 //2 hours 60 * 60 * 1000L
            slidingInterval = 2 //0.5 hour 60 * 60 * 1000L
        }
        else {
            streamingRate = args(0).toInt;
            windowSize = args(1).toInt
            slidingInterval = args(2).toInt
        }

        val sparkConf = new SparkConf().setAppName(appName).setMaster("local[*]")
        val sparkStreamCtx = new StreamingContext(sparkConf, Milliseconds(streamingRate))
        sparkStreamCtx.sparkContext.setLogLevel("ERROR")
        sparkStreamCtx.checkpoint("/tmp")

        val data: InputDStream[Trip] = prepareTestData(sparkStreamCtx)
        val dataWindow = data.window(new Duration(windowSize), new Duration(slidingInterval))

        //my main problem lies in the following line
        val newDataWindow = dataWindow.transform(rdd => sparkStreamCtx.sparkContext.parallelize(rdd.take(10)))
        newDataWindow.print

        sparkStreamCtx.start()
        sparkStreamCtx.awaitTerminationOrTimeout(1000)

    }
}

我不介意任何其他对 DStream 进行排序并获得其前 N 的方法,而不是我的方法。

4

1 回答 1

2

您可以在 DStream 对象中使用 transform 方法,然后对输入的 RDD 进行排序并将其 n 个元素放入列表中,然后过滤原始 RDD 以包含在此列表中。

val n = 10
val topN = result.transform(rdd =>{
   val list = rdd.sortBy(_._1).take(n)
   rdd.filter(list.contains)
})
topN.print
于 2016-08-08T11:44:24.690 回答