5

你总是听说函数式代码本质上比非函数式代码更容易并行化,所以我决定编写一个函数来执行以下操作:

给定一个字符串输入,合计每个字符串的唯一字符数。因此,给定输入[ "aaaaa"; "bbb"; "ccccccc"; "abbbc" ],我们的方法将返回a: 6; b: 6; c: 8

这是我写的:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

这段代码在理想情况下是可并行化的,因为其中的每个字符串input都可以在其自己的线程上处理。它并不像看起来那么简单,因为内部循环将项目添加到所有输入之间共享的 Map 中。

我希望将内部循环分解到自己的线程中,并且我不想使用任何可变状态。我将如何使用异步工作流重写此函数?

4

4 回答 4

3

你可以这样写:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

并仅使用两个额外的字符将其并行化以使用DLL 中的PSeq模块FSharp.PowerPack.Parallel.Seq而不是普通Seq模块:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

例如,从 5.5Mb King James 圣经中计算频率所需的时间从 4.75s 下降到 0.66s。这是这台 8 核机器的 7.2 倍加速。

于 2010-02-13T07:18:13.937 回答
2

正如已经指出的那样,如果您尝试让不同的线程处理不同的输入字符串,则会出现更新争用,因为每个线程都可以增加每个字母的计数。您可以让每个线程生成自己的地图,然后“将所有地图相加”,但最后一步可能会很昂贵(并且由于共享数据而不太适合使用线程)。我认为大型输入可能会使用如下算法运行得更快,其中每个线程处理不同的字母计数(对于输入中的所有字符串)。因此,每个线程都有自己独立的计数器,因此没有更新争用,也没有最后一步来组合结果。然而,我们需要预处理来发现“唯一字母集”,并且这一步确实存在相同的争用问题。(在实践中,

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

我确实已经“完全改变了算法”,主要是因为我原来的算法由于更新争用而不特别适合直接数据并行化。根据您要学习的具体内容,这个答案可能会或可能不会让您特别满意。

于 2009-01-05T19:40:09.747 回答
1

正如Don Syme 解释的那样,并行与异步不同。

因此,IMO 最好使用 PLINQ 进行并行化。

于 2009-01-14T04:36:12.913 回答
0

我根本不会说 F#,但我可以解决这个问题。考虑使用 map/reduce:

n = card(Σ)为字母 Σ 中符号 σ 的数量。

地图阶段:

产生n 个进程,其中第i个进程的分配是统计符号σ i在整个输入向量中出现的次数。

减少阶段

按顺序收集n 个进程中每个进程的总数。该向量是您的结果。

现在,这个版本不会对串行版本产生任何改进;我怀疑这里有一个隐藏的依赖关系,这使得这本质上难以并行化,但我太累了,而且今晚无法证明这一点。

于 2009-01-05T04:29:13.597 回答