5

我想处理通过 MQTT 接收到的事件流。我正在使用的库使用回调来提供结果。我正在做的处理取决于先前的状态,而不仅仅是最新的事件。此外,将来可能会从其他来源收集事件。

起初,我决定将其编入列表中,这听起来是个好主意。我有一个小问题,因为 IO 阻止了延迟评估并且等待无限流可能很长,但我通过交错 IO 解决了它。

stream :: IO [Event]允许我做一些不错的事情,比如foldl, foldM map, mapM, 等等......不幸的是,使用这种方法我宁愿不能组合两个流,因为那里没有更多的锁定功能。

我正在挖掘许多库,例如发现带有 TQueue 的 STM。不幸的是,这不是我真正想要的。

我决定创建自定义类型并制作它,Foldable这样我就可以折叠它。我因为IO失败了。

import Control.Concurrent.STM

newtype Stream a = Stream (STM a)

runStream
  :: ((a -> IO ()) -> IO i)
  -> IO (Stream a)
runStream block = do
  queue <- newTQueueIO
  block (atomically . writeTQueue queue)
  return $ Stream (readTQueue queue)

foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
  n <- atomically read
  m <- f n s
  foldStream f m (Stream read)

mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read

zipStream :: [Stream a] -> Stream a
zipStream = undefined

Whih 可以像这样使用main = foldStream (\x _ -> print x) () =<< events

是否可以像使用常规列表一样实现一些基类以使用此流?

4

1 回答 1

4

在这些情况下,通常的技巧是让回调写入队列,然后从队列的另一端读取。

使用stm-chans包中的有界、可关闭队列,我们​​可以定义这个函数:

import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue

foldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b
foldQueue queue step start done =
    let go state = 
            do m <- atomically (readTBMQueue queue)
               case m of 
                   Nothing -> done state
                   Just a  -> step state a >>= go
     in start >>= go

它采用通道、一个阶跃函数(类似于 所需的foldM)、一个获取初始状态的动作和一个返回最终结果的“完成”动作,然后从通道提供数据直到它关闭。请注意,折叠状态x是由 的调用者选择的foldQueue

如果以后我们想从foldl包升级到单子折叠——它有一个非常有用Applicative的实例——我们可以这样做:

import qualified Control.Foldl as L

foldQueue' :: TBMQueue a -> L.FoldM IO a b -> IO b 
foldQueue' queue = L.impurely (foldQueue queue)

impurely从“foldl”包中使用。

有时(例如在解析、分组或解码时)使用基于拉取的消费者更容易。我们可以使用包来做到这一点:

import Streaming
import qualified Streaming.Prelude as S

foldQueue' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r
foldQueue' queue consume = consume (S.untilRight (do
    m <- atomically (readTBMQueue queue)
    return (case m of
        Nothing -> Right ()
        Just a -> Left a)))

给定一个使用流的函数,我们向它提供从队列中读取的值流。

通常,从通道读取和写入必须在不同的线程中发生。我们可以使用concurrentlyfrom async之类的函数来干净地处理它。

于 2019-08-07T18:17:26.280 回答