3

我正在尝试使用现有的 MapReduce 实现(Real World Haskell中的那个)编写一个简单的程序。

作为使用框架的一个例子,这里有一些代码来计算文件中的单词数:

module Main where

import Control.Monad (forM_)
import Data.Int (Int64)
import qualified Data.ByteString.Lazy.Char8 as LB
import System.Environment (getArgs)

import LineChunks (chunkedReadWith)
import MapReduce (mapReduce, rdeepseq)

wordCount :: [LB.ByteString] -> Int
wordCount = mapReduce rdeepseq (length . LB.words)
                      rdeepseq sum

main :: IO ()
main = do
  args <- getArgs
  forM_ args $ \path -> do
    numWords <- chunkedReadWith wordCount path
    putStrLn $ "Words: " ++ show numWords

我需要使用相同的 MapReduce 框架来编写一个程序来搜索一些字符串(比如“il”),并返回找到它们的行号。例如,输出可能是:

verILy: found on lines 34, 67, 23
ILlinois: found on lines 1, 2, 56
vILla: found on lines 4, 5, 6

(“il”的大写不是必需的。)

我是 Haskell 初学者,还没有任何具体的想法。我确实注意到 Data.ByteString.Lazy.Char8 类有一个成员函数findIndices。可以使用这个吗?

任何正确方向的代码或提示将不胜感激。

4

1 回答 1

3

好吧,让我们打这个。


首先,我们需要一种在字符串中查找单词的方法。这可以使用正则表达式来完成,比如说regex-tdfa包。Haskell 正则表达式包很好,但非常通用,一开始有点难以阅读。我们将创建一个函数,它只是匹配运算符的包装器,(=~)主要是为了使类型具体化。

wordHere :: LB.ByteString -> LB.ByteString -> Bool
wordHere word string = string =~ word
-- a.k.a.            = flip (=~)

现在,通过将大量并行的本地“映射”作业类型分配给不同的spark 来mapReduce分解列表,然后将 reduce 作业中的所有结果折叠为. 一般来说,reduce 步骤的顺序不能保证,但 RWH实际上确实给了我们这样的保证。([a] -> c)(a -> b)([b] -> c)mapReduce

RWH 的lineChunks函数实际上将文档 (LB.ByteString -> [LB.ByteString]) 拆分为少量行的块。我们的映射作业每个都获得这些块之一,并且需要在本地提供有关行匹配的信息。我们可以通过将块拆分为它们的组成行,对行进行本地编号,映射wordHere到它们,并收集wordHere返回 true 的本地行号来做到这一点。我们通常会先做它,用wordHere任何谓词替换p :: (LB.ByteString -> Bool)

import Control.Arrow

localLinesTrue :: (LB.ByteString -> Bool) -> [LB.ByteString] -> [Int]
localLinesTrue p ls = map fst . filter snd . map (second p) . zip [1..]

现在我们可以创建本地映射器,例如localLinesTrue (wordHere $ LB.pack "foobar") . LB.lines :: LB.ByteString -> [Int].

给定该类型的映射器还稍微阐明了函数其余部分的类型。我们现在有

>>> :t mapReduce rdeepseq (localLinesTrue (wordHere "foo")) rdeepseq
...    :: ([[Int]] -> c) -> [LB.ByteString] -> c

所以我们的 reducer 必须是 type ([[Int]] -> c)。酷,让我们试着做到这一点。如果我们有一个 s 列表的列表,Int我们可以重建实际的行号......

[[], [], [], [5], [3], [], []]

等等,实际上,我们不能。我们需要在映射器中添加更多信息——每个块中出现的行数。幸运的是,由于我们小心翼翼地让我们的东西解开,这很容易添加。我们需要一个更像([Int], Int)第二个参数是该块的行数的返回类型。我们可以用 "fanout" 做到这一点(&&&)

mapper regex = (localLinesTrue (wordHere regex) &&& length) . LB.lines

现在我们的输出看起来像

[ ([], 3),  ([], 5),  ([3, 5], 10), ... ]

我们可以实现一个只使用Statemonad计数的 reducer

reducerStep :: ([Int], Int) -> State Int [Int]
reducerStep (hits, count) = do pos <- get
                               modify (+count)
                               return (map (+pos) hits)

reducer :: [([Int], Int)] -> [Int]
reducer = concat . evalState 0 . mapM reducerStep

我们有

mapReduce rdeepseq (mapper regex)
          rdeepseq reducer
  :: [LB.ByteString] -> [Int]

这应该能让你完成 95% 的工作。

于 2013-03-06T18:54:09.787 回答