Tikhon 的解决方案是最简单的解决方案,但它有一个主要缺陷:在处理整个列表之前它不会产生任何结果,如果处理太大的列表会溢出。
更接近 C# 的 Rx 的解决方案是使用流式库,例如pipes
.
例如,您可以定义从用户输入Producer
生成s 的 a:String
import Control.Monad
import Control.Proxy
lines' :: (Proxy p) => () -> Producer p String IO r
lines' () = runIdentityP $ forever $ do
str <- lift getLine
respond str
然后你可以定义一个需要 10 行的阶段:
take' :: (Monad m, Proxy p) => Int -> () -> Pipe p a a m ()
take' n () = runIdentityP $ replicateM_ n $ do
a <- request ()
respond a
...然后是处理阶段:
proc :: (Monad m, Proxy p) => () -> Pipe p String (String, Int) m r
proc () = runIdentityP $ forever $ do
str <- request ()
respond (str, length str)
...和最终输出阶段:
print' :: (Proxy p, Show a) => () -> Consumer p a IO r
print' () = runIdentityP $ forever $ do
a <- request ()
lift $ print a
现在您可以将它们组合成一个处理链并运行它:
main = runProxy $ lines' >-> take' 10 >-> proc >-> print'
...它会在输入每一行后立即输出处理后的结果,而不是在最后以批处理的形式提供结果:
$ ./pipes
Apple<Enter>
("Apple",5)
Test<Enter>
("Test",4)
123<Enter>
("123",3)
4<Enter>
("4",1)
5<Enter>
("5",1)
6<Enter>
("6",1)
7<Enter>
("7",1)
8<Enter>
("8",1)
9<Enter>
("9",1)
10<Enter>
("10",2)
$
实际上,您不必自己定义这些管道。pipes
您可以从标准库中的组件组装相同的链:
>>> runProxy $ stdinS >-> takeB_ 10 >-> mapD (\x -> (x, length x)) >-> printD
<exact same behavior>