0

下面给出的是我的代码,我正在监视存储在 MapR 沙箱上的文件夹(我也尝试过使用我的本地文件系统,但仍然没有工作)我正在将新文件复制/移动(都尝试过)到文件夹中但得到 NO输出。

该程序不是流式传输和文件

object Log_Stream extends Serializable{


val timeout = 10 // Terminate after N seconds
val batchSeconds = 2 // Size of batch intervals

def main(args: Array[String]): Unit = {

val offsetReset = "earliest"
val batchInterval = "2"
val pollTimeout = "1000"
val FolderPath = "maprfs:///user/vipulrajan/logs"  //

val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[2]").set("spark.testing.memory", "536870912")

val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))

val messages = ssc.textFileStream(FolderPath)
println("message values received")


//val myData = values.map(x =>{parseSensor(x)})
messages.print()


// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()


 }

}

我也尝试过重命名和编辑它们,但无济于事。我没有使用 spark submit 提交代码,而是直接从 Eclipse 运行它。

4

0 回答 0