7

我编写了一个程序来计算语料库中 NGrams 的频率。我已经有了一个函数,它使用一个令牌流并生成一个订单的 NGram:

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)

目前我只能将一个流消费者连接到流源:

tokens --- trigrams --- countFreq

如何将多个流消费者连接到同一个流源?我想要这样的东西:

           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq

一个优点是并行运行每个消费者

编辑: 感谢 Petr 我想出了这个解决方案

spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan

    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)

    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan

    forM results readMVar

    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs
4

1 回答 1

6

我假设您希望所有接收器都接收所有值。

我建议:

  1. 用于newBroadcastTMChan创建新频道Control.Concurrent.STM.TMChan(stm-chans)。
  2. 使用此通道为您的主要生产者使用sinkTBMChanfrom Data.Conduit.TMChan(stm-conduit) 构建接收器。
  3. 为每个客户使用dupTMChan创建自己的副本以供阅读。启动一个新线程,该线程将使用sourceTBMChan.
  4. 从您的线程中收集结果。
  5. 确保您的客户端可以像生成数据一样快地读取数据,否则您可能会出现堆溢出。

(我没试过,让我们知道它是如何工作的。)


更新:MVar收集结果的一种方法是为每个消费者线程创建一个。他们每个人都会putMVar在完成后得到结果。并且您的主线程将takeMVar在所有这些MVars 上,从而等待每个线程完成。例如,如果vars是您MVar的 s 的列表,则主线程将发出mapM takeMVar vars以收集所有结果。

于 2013-07-29T19:08:16.107 回答