5

我希望看到源的不确定性交错操作,其类型签名如

interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)

用例是我有一个 p2p 应用程序,它维护与网络上许多节点的开放连接,并且它主要只是坐在那里等待来自其中任何一个节点的消息。当消息到达时,它并不关心它来自哪里,而是需要尽快处理该消息。从理论上讲,这种应用程序(至少在用于类似套接字的源时)可以完全绕过 GHC 的 IO 管理器并运行select/ epoll/etc。直接调用,但我并不特别关心它是如何实现的,只要它有效。

使用导管可以实现这样的事情吗?一种不太通用但可能更可行的方法可能是编写一个[(k, Socket)] -> Source m (k, ByteString)函数来为您处理所有套接字上的接收。

我注意到了ResumableSource管道中的操作,但他们似乎都想知道一个特定的Sink,这感觉有点像抽象泄漏,至少对于这个操作来说是这样。

4

2 回答 2

5

stm-conduit 包提供了mergeSources,它执行与您正在寻找的内容相似但不相同的事情。这可能是一个很好的起点。

于 2012-07-14T18:45:49.010 回答
3

是的,有可能。

您可以通过分叉线程轮询一堆Sources 而不会阻塞,以轮询在每个线程中您与将输出发送到某个并发通道的Sourcea配对的位置:Sink

concur :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Sink a m r

...然后您定义Source从该通道读取的 a :

synchronize :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Source a m r

请注意,这与仅分叉线程以轮询套接字本身没有什么不同,但对于conduit可能希望使用Source他们定义的 s 轮询套接字以外的其他事物的其他用户来说,这将是有用的,因为它更通用。

如果您将这些功能组合到一个函数中,那么调用的整体 API 将类似于:

poll :: (WhateverIOMonadClassItWouldWant m) => [Source a m r] -> m (Source a m r)

...但如果你愿意,你仍然可以把那些ks 扔进去。

于 2012-07-14T14:00:29.847 回答