下面给出的是我的代码,我正在监视存储在 MapR 沙箱上的文件夹(我也尝试过使用我的本地文件系统,但仍然没有工作)我正在将新文件复制/移动(都尝试过)到文件夹中但得到 NO输出。
该程序不是流式传输和文件
object Log_Stream extends Serializable{
val timeout = 10 // Terminate after N seconds
val batchSeconds = 2 // Size of batch intervals
def main(args: Array[String]): Unit = {
val offsetReset = "earliest"
val batchInterval = "2"
val pollTimeout = "1000"
val FolderPath = "maprfs:///user/vipulrajan/logs" //
val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[2]").set("spark.testing.memory", "536870912")
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
val messages = ssc.textFileStream(FolderPath)
println("message values received")
//val myData = values.map(x =>{parseSensor(x)})
messages.print()
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
我也尝试过重命名和编辑它们,但无济于事。我没有使用 spark submit 提交代码,而是直接从 Eclipse 运行它。