1

我是 Spark 的新手。

我有一个火花流批处理作业(也许它应该是结构流),它每小时从 kafka 接收数据。

我发现我的火花一直在消耗数据并且不会停止。

所以我想控制它,例如,

Now it is 3 am, and my spark should consume data between 2~3 am from kafka topic, next hour should consume 3~4 am 

有什么想法吗?谢谢。

- - - - 代码 - - - - -

 SparkConf sparkConf = new SparkConf().setAppName("CalculateHourlyFromKafka");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.minutes(60));
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
 stream.foreachRDD((rdd, time) -> {
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
            JavaRDD<Span> rowRDD = rdd.map(message -> {
                Reader stringReader = new StringReader(message.value());
                List<Span> spanList = new CsvToBeanBuilder(stringReader).withType(Span.class).build().parse();
                return spanList.get(0);
            });
            Dataset<Row> spanDataFrame = spark.createDataFrame(rowRDD, Span.class);

            spanDataFrame.createOrReplaceTempView("span_data_raw");

            Dataset<Row> aggregatedSpan =
                    spark.sql("select " +
                            "TAGS_APPNAME as applicationname, " +
                            "from span_data_raw " +
                            "group by TAGS_APPNAME" +
                            "");
            aggregatedSpan.show();
        });
4

0 回答 0