大多数数据处理可以设想为组件的管道,一个组件的输出馈入另一个组件的输入。一个典型的处理管道是:
reader | handler | writer
作为开始这个讨论的陪衬,让我们考虑这个管道的面向对象的实现,其中每个段都是一个对象。该handler
对象包含对reader
和writer
对象的引用,并具有run
如下所示的方法:
define handler.run:
while (reader.has_next) {
data = reader.next
output = ...some function of data...
writer.put(output)
}
示意性地,依赖项是:
reader <- handler -> writer
现在假设我想在阅读器和处理程序之间插入一个新的管道段:
reader | tweaker | handler | writer
同样,在这个 OO 实现中,tweaker
将是reader
对象的包装器,并且tweaker
方法可能看起来像(在一些伪命令式代码中):
define tweaker.has_next:
return reader.has_next
define tweaker.next:
value = reader.next
result = ...some function of value...
return result
我发现这不是一个非常可组合的抽象。一些问题是:
tweaker
只能在 的左侧使用handler
,即我不能使用上面的实现tweaker
来形成这个管道:读者 | 处理程序 | 调整器 | 作家
我想利用管道的关联属性,以便该管道:
读者 | 处理程序 | 作家
可以表示为:
reader | p
p
管道在哪里handler | writer
。在这个 OO 实现中,我必须部分实例化handler
对象
- 有点重述(1),对象必须知道它们是“推”还是“拉”数据。
我正在寻找一个框架(不一定是 OO)来创建解决这些问题的数据处理管道。
我用Haskell
和标记了它,functional programming
因为我觉得函数式编程概念在这里可能有用。
作为一个目标,能够创建这样的管道会很好:
handler1
/ \
reader | partition writer
\ /
handler2
从某种角度来看,Unix shell 管道通过以下实现决策解决了很多这些问题:
管道组件在不同的进程中异步运行
管道对象在“推动者”和“拉动者”之间调解传递数据;即,它们阻止写入数据过快的写入器和尝试读取过快的读取器。
您使用特殊的连接器
<
并将>
无源组件(即文件)连接到管道
我对在代理之间不使用线程或消息传递的方法特别感兴趣。也许这是最好的方法,但如果可能的话,我想避免线程。
谢谢!