0

我现在正在使用 Spark Streaming + Kafka 来构建我的消息处理系统。但是我有一点技术问题,我将在下面描述它:

例如,我想每10 分钟做一次 wordcount ,所以,在我最早的代码中,我将 Batch Interval 设置为 10 分钟。代码如下:

 val sparkConf = new SparkConf().setAppName(args(0)).setMaster(args(1))
 val ssc = new StreamingContext(sparkConf, Minutes(10))

但我认为这不是一个很好的解决方案,因为 10 分钟是一个很长的时间和大量的数据,我的内存无法承受这么多数据。所以,我想将批处理间隔减少到 1 分钟,例如:

 val sparkConf = new SparkConf().setAppName(args(0)).setMaster(args(1))
 val ssc = new StreamingContext(sparkConf, Minutes(1))

那么问题来了:十个‘1分钟’怎么能总结出10分钟的结果呢?我认为这个词只能在驱动程序而不是工人程序中完成,我该怎么办?

我是 Spark Streaming 的新学习者。任何人都可以帮我一把吗?

4

2 回答 2

0

也许我有我的想法。在这种情况下,我应该使用有状态函数UpdateStateByKey()因为,因为我想要的是全局 10 分钟的结果,但我能得到的只是每 1 分钟的每个中间结果,所以在每 10 分钟结束之前,我必须记录每 1 分钟的状态,如每 1 分钟的字数统计结果,每 1 分钟相加。

于 2015-04-01T03:30:23.370 回答
0

在这里发帖是因为我遇到了类似的问题,并且遇到了Spark Streaming 的Window Operations部分。在发布者的原始案例中,他们想要过去 10 分钟的计数,每 10 分钟完成一次,尽管他们的程序每 1 分钟计算一次计数。假设我们已经counts定义并计算为标准字数(即在 1 分钟的批处理持续时间内,使用元组 (word, count)),我们可以按照链接的指南定义一些内容

// Reduce/count last 10 seconds worth of data, every 10 seconds
val windowedWordCounts = counts.reduceByKeyAndWindow(_+_, Seconds(10), Seconds(10))

哪里_+_是求和函数。

于 2016-09-28T20:38:24.313 回答