1

我对 Spark 还是很陌生,我正在努力实现一个迭代函数。我希望有人可以帮助我吗?

特别是,我正在尝试实现CUSUM控制统计:

$ S_i = \max (0, S_{i-1} + x_i - Target - w $ 其中 $ S_0 = 0 $ 和 $ w, Target $ 是固定参数。

挑战在于 CUSUM 统计量被定义为需要有序数据和先前函数值的迭代函数。

以下数据框显示了 $ Target = 1 $ 和 $ w = 0.1 $ 的所需输出:

i    x    S
--------------
1    1.3  0.2
2    1.8  0.9
3    0.5  0.3
4    0.6  0
5    1.2  0.1
6    1.8  0.8

另一方面:我想不可能以分布式方式运行 CUSUM?我的数据集相当大,但包含多个组。我希望这意味着我仍然可以实现一些并发。我想我必须重新分区我的数据以使每组有一个分区才能同时运行每组的 CUSUM 算法?

我希望这是有道理的,任何指针都受到高度赞赏!理想情况下,我正在寻找 Scala 和 Spark 2.1 中的解决方案

非常感谢!

4

1 回答 1

0

经过大量谷歌研究,我找到了解决问题的方法mapPartitions

val dataset = Seq(1.3, 1.8, 0.5, 0.6, 1.2, 1.8).toDS

dataset.repartition(1).mapPartitions(iterator => {
    var s = 0.0
    val target = 1.0
    val w = 0.1
    iterator.map(x => {
        s = Math.max(0.0, s + x -target - w)
        Math.round(10.0 *s)/10.0
    })
}).show()

+-----+
|value|
+-----+
|  0.2|
|  0.9|
|  0.3|
|  0.0|
|  0.1|
|  0.8|
+-----+

我希望这会在未来节省一些时间。

于 2017-04-23T19:43:34.597 回答