4

postgresql-simple提供流式查询的功能,例如

fold 
  :: (FromRow row, ToRow params)
  => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a

我想创建一个充分利用流媒体的管道源。

mySource :: (FromRow row, Monad m) => Source m row

不幸的是,因为IO出现在 中的逆变位置(我认为?)fold,我真的很挣扎这些类型。以下类型检查,但在产生值之前折叠整个流。

getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
  where
    foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
    foo cond evt = pure (cond >> C.yield evt)

任何有关如何实现这一点的指示将不胜感激!谢谢!

4

2 回答 2

5

一种(不太好的)方法来解决这个问题

我没有办法对此进行测试,但以下应该可以

import Conduit
import Database.PostgreSQL.Simple (foreach_)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)

mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
mySource connection query = do
  chan <- newTMChanIO
  forEach_ connection query (atomically . writeTMChan chan)
  pure (sourceTMChan chan)

如果我们有forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m ()这可能会更容易......

于 2017-01-13T22:10:21.317 回答
0

这是上面对 Alec 的修改,可以编译和运行。mkPgSource是 Alec 在帖子末尾提到的一般功能。

import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.FromRow
import Database.PostgreSQL.Simple.ToRow
import Control.Monad.IO.Class (MonadIO)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, 
closeTMChan, TMChan)
import GHC.Conc (atomically, forkIO)
import Conduit

--closes the channel after action is done to terminate the source
mkPgSource :: (MonadIO m, FromRow r) => ((r -> IO ()) -> IO ()) -> IO (Source m r)
mkPgSource action = do
  chan <- newTMChanIO
  _ <- forkIO $ do action $ atomically . (writeTMChan chan)
               atomically $ closeTMChan chan
  pure $ sourceTMChan chan

sourceQuery :: (ToRow params, FromRow r, MonadIO m) =>
     Connection -> Query -> params -> IO (Source m r)
sourceQuery conn q params = mkPgSource $ forEach conn q params

sourceQuery_ :: (FromRow r, MonadIO m) => Connection -> Query -> IO 
(Source m r)
sourceQuery_ conn q = mkPgSource $ forEach_ conn q
于 2017-06-04T02:12:46.597 回答