我是 Spark 的新手。
我有一个火花流批处理作业(也许它应该是结构流),它每小时从 kafka 接收数据。
我发现我的火花一直在消耗数据并且不会停止。
所以我想控制它,例如,
Now it is 3 am, and my spark should consume data between 2~3 am from kafka topic, next hour should consume 3~4 am
有什么想法吗?谢谢。
- - - - 代码 - - - - -
SparkConf sparkConf = new SparkConf().setAppName("CalculateHourlyFromKafka");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.minutes(60));
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
stream.foreachRDD((rdd, time) -> {
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
JavaRDD<Span> rowRDD = rdd.map(message -> {
Reader stringReader = new StringReader(message.value());
List<Span> spanList = new CsvToBeanBuilder(stringReader).withType(Span.class).build().parse();
return spanList.get(0);
});
Dataset<Row> spanDataFrame = spark.createDataFrame(rowRDD, Span.class);
spanDataFrame.createOrReplaceTempView("span_data_raw");
Dataset<Row> aggregatedSpan =
spark.sql("select " +
"TAGS_APPNAME as applicationname, " +
"from span_data_raw " +
"group by TAGS_APPNAME" +
"");
aggregatedSpan.show();
});