3

我正在使用该streaming软件包。我想S.store通过保留常量内存来使用由 定义的一个步骤的结果作为管道中后续步骤的参数。从文件中myStream加载和解析。

我有一个类型检查的以下示例:

import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M

data A = MkA deriving (Show)

insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined

insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined

myStream :: S.Stream (S.Of A) IO r
myStream = undefined

run :: IO ()
run =
  myStream
    & S.store insertA
    & insertB M.empty
    & print

但是,该行& insertB M.empty正在使用一个空地图,但我想从insertA函数中使用上一步中的地图。然后该insertB函数使用此 Map 进行查找。

我能想到的解决方案如下:

run :: IO ()
run =
  myStream
    & S.store insertA
    & ( \e -> do
          resultMap <- S.effects e
          insertB resultMap e
      )
    & print

问题

这是否保留了流媒体的好处,比如在恒定内存中运行?它如何在后台解决这个问题,因为需要将流作为一个整体进行处理才能获得Map?它多次传递相同的流 - 从文件加载它 2 次以保留常量内存?

如果是这种情况(加载文件 2 次),如果流的源不是来自解析文件,而是来自某些只能读取一次的数据流怎么办?

对于这个问题,是否还有其他优雅的解决方案也具有流式传输的好处,管道中的下一步需要使用上一步的结果?

4

1 回答 1

1

这里建议的代码有问题:

  resultMap <- S.effects e
  insertB resultMap e

问题是您两次“运行”同一个流,这对于基于流的流通常是有问题IO的。

例如,假设myStream从文件句柄中读取。当我们调用insertB第二遍时,effects已经到达文件末尾!从句柄进行的任何进一步读取都不会返回任何数据。

当然,我们可以用两个不同的流读取同一个文件两次。这保留了流媒体,但需要两次通过。


应该注意的是,对于某些具有内置资源管理的基本 monad,例如resourcet,您可以运行相同的Stream值两次,因为流代码足够“智能”,可以在每次流时分配和释放底层资源跑。

例如,linear-base中存在的Stream类型版本支持函数:readFile

readFile :: FilePath -> Stream (Of Text) RIO ()

它返回一个Stream资源感知IO中工作。

也就是说,我不喜欢在流管道中隐藏对文件的重复读取,这让我感到困惑。

于 2022-02-09T13:01:00.283 回答