我在 spark-streaming 应用程序中遇到错误,我正在使用 kafka 作为输入流。当我使用套接字时,它工作正常。但是当我更改为 kafka 时,它会出错。任何人都知道为什么会抛出错误,我需要更改批处理时间并检查指向时间吗?
ERROR StreamingContext:启动上下文时出错,将其标记为已停止 java.lang.StackOverflowError
我的程序:
def main(args: Array[String]): Unit = {
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(5))
val brokers = args(0)
val topics= args(1)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val inputStream = messages.map(_._2)
// val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
ssc.checkpoint(checkpointDirectory)
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
}
}