我有两个节点独立集群用于火花流处理。下面是我的示例代码,它演示了我正在执行的过程。
sparkConf.setMaster("spark://rsplws224:7077")
val ssc=new StreamingContext()
println(ssc.sparkContext.master)
val inDStream = ssc.receiverStream //batch of 500 ms as i would like to have 1 sec latency
val filteredDStream = inDStream.filter // filtering unwanted tuples
val keyDStream = filteredDStream.map // converting to pair dstream
val stateStream = keyDStream .updateStateByKey //updating state for history
stateStream.checkpoint(Milliseconds(2500)) // to remove long lineage and meterilizing state stream
stateStream.count()
val withHistory = keyDStream.join(stateStream) //joining state wit input stream for further processing
val alertStream = withHistory.filter // decision to be taken by comparing history state and current tuple data
alertStream.foreach // notification to other system
我的问题是 spark 没有将此状态 RDD 分配给多个节点或没有将任务分配给其他节点并导致响应的高延迟,我的输入负载约为每秒 100,000 个元组。
我已经尝试过以下事情,但没有任何效果
1)spark.locality.wait
到 1 秒
2)减少分配给执行程序进程的内存以检查天气火花分发RDD或任务,但即使它超出了驱动器也在运行的第一个节点(m1)的内存限制。
3) 将 spark.streaming.concurrentJobs 从 1(默认)增加到 3
4) 我检查了流 ui 存储,状态 dstream RDD 大约有 20 个分区,都位于本地节点 m1 上。
如果我运行 SparkPi 100000,那么 spark 能够在几秒钟(30-40)后利用另一个节点,所以我确信我的集群配置很好。
编辑
我注意到的一件事是,即使对于我的 RDD,如果我设置存储级别 MEMORY_AND_DISK_SER_2 然后也在应用程序 ui 存储中显示Memory Serialized 1x Replicated