18

大多数数据处理可以设想为组件的管道,一个组件的输出馈入另一个组件的输入。一个典型的处理管道是:

reader | handler | writer

作为开始这个讨论的陪衬,让我们考虑这个管道的面向对象的实现,其中每个段都是一个对象。该handler对象包含对readerwriter对象的引用,并具有run如下所示的方法:

define handler.run:
  while (reader.has_next) {
    data = reader.next
    output = ...some function of data...
    writer.put(output)
  }

示意性地,依赖项是:

reader <- handler -> writer

现在假设我想在阅读器和处理程序之间插入一个新的管道段:

reader | tweaker | handler | writer

同样,在这个 OO 实现中,tweaker将是reader对象的包装器,并且tweaker方法可能看起来像(在一些伪命令式代码中):

define tweaker.has_next:
  return reader.has_next

define tweaker.next:
  value = reader.next
  result = ...some function of value...
  return result

我发现这不是一个非常可组合的抽象。一些问题是:

  1. tweaker只能在 的左侧使用handler,即我不能使用上面的实现tweaker来形成这个管道:

    读者 | 处理程序 | 调整器 | 作家

  2. 我想利用管道的关联属性,以便该管道:

    读者 | 处理程序 | 作家

可以表示为:

reader | p

p管道在哪里handler | writer。在这个 OO 实现中,我必须部分实例化handler对象

  1. 有点重述(1),对象必须知道它们是“推”还是“拉”数据。

我正在寻找一个框架(不一定是 OO)来创建解决这些问题的数据处理管道。

我用Haskell和标记了它,functional programming因为我觉得函数式编程概念在这里可能有用。

作为一个目标,能够创建这样的管道会很好:

                     handler1
                   /          \
reader | partition              writer
                   \          /
                     handler2

从某种角度来看,Unix shell 管道通过以下实现决策解决了很多这些问题:

  1. 管道组件在不同的进程中异步运行

  2. 管道对象在“推动者”和“拉动者”之间调解传递数据;即,它们阻止写入数据过快的写入器和尝试读取过快的读取器。

  3. 您使用特殊的连接器<并将>无源组件(即文件)连接到管道

我对在代理之间不使用线程或消息传递的方法特别感兴趣。也许这是最好的方法,但如果可能的话,我想避免线程。

谢谢!

4

4 回答 4

22

是的,几乎肯定是你的人。

我怀疑您对 Haskell 相当陌生,仅基于您在问题中所说的内容。箭头可能看起来相当抽象,特别是如果您正在寻找的是“框架”。我知道我花了一段时间才真正了解箭头发生了什么。

因此,您可能会看着该页面并说“是的,这看起来像我想要的”,然后发现自己对如何开始使用箭头来解决问题感到迷茫。所以这里有一些指导,所以你知道你在看什么。

箭头不会解决你的问题。相反,它们为您提供了一种可以用来表达问题的语言。你可能会发现一些预定义的箭头可以完成这项工作——可能是一些 kleisli 箭头——但最终你会想要实现一个箭头(预定义的只是给你简单的方法来实现它们),它表示您所说的“数据处理器”是什么意思。作为一个几乎微不足道的例子,假设你想通过简单的函数来实现你的数据处理器。你会写:

newtype Proc a b = Proc { unProc :: a -> b }

-- I believe Arrow has recently become a subclass of Category, so assuming that.

instance Category Proc where
    id = Proc (\x -> x)
    Proc f . Proc g = Proc (\x -> f (g x))

instance Arrow Proc where
    arr f = Proc f
    first (Proc f) = Proc (\(x,y) -> (f x, y))

这为您提供了使用各种箭头组合器 、 、 等的机制(***)(&&&)以及(>>>)箭头符号,如果您正在做复杂的事情,这是相当不错的。因此,正如 Daniel Fischer 在评论中指出的那样,您在问题中描述的管道可以组成为:

reader >>> partition >>> (handler1 *** handler2) >>> writer

但很酷的是,处理器的含义取决于您。可以使用不同的处理器类型以类似的方式实现您提到的关于每个处理器分叉线程的内容:

newtype Proc' a b = Proc (Source a -> Sink b -> IO ())

然后适当地实施组合器。

这就是您正在查看的内容:用于讨论组合流程的词汇表,其中有一些代码可以重用,但主要会在您实现这些组合器以定义在您的域中有用的处理器时帮助指导您的思考.

我的第一个重要的 Haskell 项目之一是为量子纠缠实现箭头。那个项目让我真正开始理解 Haskell 的思维方式,这是我编程生涯的一个重要转折点。也许你的这个项目也会为你做同样的事情?:-)

于 2011-11-15T23:30:16.520 回答
7

由于惰性求值,我们可以用 Haskell 中的普通函数组合来表达管道。这是一个计算文件中行的最大长度的示例:

main = interact (show . maximum . map length . lines)

这里的一切都是一个普通的功能,例如

lines :: String -> [String]

但是由于惰性求值,这些函数只能增量处理输入,并且只处理需要的量,就像 UNIX 管道一样。

于 2011-11-16T10:30:01.893 回答
4

Haskell的枚举器包是一个很好的框架。它定义了三种类型的对象:

  1. 以块的形式生成数据的枚举器。
  2. 消耗大量数据并在消耗足够多后返回值的迭代。
  3. 位于管道中间的枚举数。它们消耗块并产生块,可能有副作用。

这三种类型的对象组成了一个流处理管道,您甚至可以在一个管道中拥有多个枚举器和迭代器(当一个完成时,下一个将取代它)。从头开始编写其中一个对象可能很复杂,但是有很多组合器可用于将常规函数转换为数据流处理器。例如,此管道从标准输入读取所有字符,使用函数将它们转换为大写toUpper,然后将它们写入标准输出:

ET.enumHandle stdin $$ ET.map toUpper =$ ET.iterHandle stdout

其中模块Data.Enumerator.Text已导入为ET.

于 2011-11-16T15:33:46.717 回答
2

Yesod框架以管道包的形式使用 Haskell 管道库。

于 2012-10-10T07:34:42.830 回答