我想处理通过 MQTT 接收到的事件流。我正在使用的库使用回调来提供结果。我正在做的处理取决于先前的状态,而不仅仅是最新的事件。此外,将来可能会从其他来源收集事件。
起初,我决定将其编入列表中,这听起来是个好主意。我有一个小问题,因为 IO 阻止了延迟评估并且等待无限流可能很长,但我通过交错 IO 解决了它。
stream :: IO [Event]
允许我做一些不错的事情,比如foldl
, foldM
map
, mapM
, 等等......不幸的是,使用这种方法我宁愿不能组合两个流,因为那里没有更多的锁定功能。
我正在挖掘许多库,例如发现带有 TQueue 的 STM。不幸的是,这不是我真正想要的。
我决定创建自定义类型并制作它,Foldable
这样我就可以折叠它。我因为IO失败了。
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
Whih 可以像这样使用main = foldStream (\x _ -> print x) () =<< events
是否可以像使用常规列表一样实现一些基类以使用此流?