1

使用Haskell-streaming,我可以轻松地对流进行分组并对每个组进行求和。

>>> S.print $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
[-1,-2]
[3,4,5]
[-6]

>>> S.print $S.map sum $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
-3
12
-6

如何有一个函数myfn以一种顺序敏感的方式生成一个将上述两者合并的流?即我希望有一个结果流

>>> myfn $ each [-1,-2,3,4,5,-6]
-1:> -2:> -3:> 3:> 4:> 5:> 12:> -6:> -6:> ()
4

1 回答 1

1

该解决方案涉及使函数参数在一次通过中mapped累积列表计算总和。

我认为可以做到这一点store,但我发现foldl的流媒体接收器更易于使用。他们的Applicative实例让我们可以从更简单的构建复合Folds:

foo :: Monad m 
    => (Int -> Int -> Bool) 
    -> Stream (Of Int) m ()
    -> Stream (Of Int) m ()
foo p = 
      flip S.for (\(xs,total) -> S.each xs *> S.yield total)
    . mapped (L.purely S.fold $ (,) <$> L.list <*> L.sum)
    . S.groupBy p

其中L.purelyL.listL.sum来自“foldl”。

画龙点睛的是取出每一对([Int],Int)mapped并用一个子流替换它for

让它发挥作用:

*Main> S.print $ foo (\x y -> x*y>0) $ S.each [-1,-2,3,4,5,-6]

编辑:想想看,以前的解决方案是有缺陷的。我们只对流式传输S.toList的结果感兴趣,但我们使用或L.list在将其发送到下游之前将每个单独的组累积在内存中。但是,如果一组恰好大于机器中的可用内存怎么办?

这是一个完美流式传输且与每个组的大小无关的解决方案:

foo :: Monad m 
    => (Int -> Int -> Bool) 
    -> Stream (Of Int) m ()
    -> Stream (Of Int) m ()
foo p = 
      concats
    . S.maps (S.store (\s -> do (total :> r) <- L.purely S.fold L.sum s
                                S.yield total
                                return r))
    . S.groupBy p

发生了什么变化?首先,我们使用maps而不是mapped,因为现在我们要转换子组流,而不是在基本 monad 中返回结果。

对于每个子组流,我们使用store执行求和折叠而不破坏流。然后我们获取折叠的结果并将其附加回流中,同时还按照maps.

剩下的唯一步骤是使用 重新加入子组concats

于 2020-02-21T17:26:03.383 回答