我编写了一个程序来计算语料库中 NGrams 的频率。我已经有了一个函数,它使用一个令牌流并生成一个订单的 NGram:
ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
目前我只能将一个流消费者连接到流源:
tokens --- trigrams --- countFreq
如何将多个流消费者连接到同一个流源?我想要这样的东西:
           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq
一个优点是并行运行每个消费者
编辑: 感谢 Petr 我想出了这个解决方案
spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan
    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)
    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan
    forM results readMVar
    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs