8
(fileNameToCharStream "大文件"
 |>> 保险丝[长度;
           splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> 长度;
           splitBy (fun x -> x = '\n') keepEmpty |>> 长度;
         ])
  (*fuse "fuses" 三个函数同时运行*)
 |> run 2 (*强制在两个线程上并行运行*)
 |> (fun [num_chars; num_words; num_lines] ->
       printfn "%d %d %d"
           num_chars num_words, num_lines))

我想让这段代码按以下方式工作:将原始流分成两个正好在中间;然后对于每一半运行一个单独的计算,计算 3 件事:长度(即字符数)、单词数、行数。但是,如果我错误地分割了一个单词,我不想有问题。必须注意这一点。该文件应该只读一次。

我应该如何对指定的函数和运算符 |>> 进行编程?可能吗?

4

1 回答 1

8

看起来你的要求很高。我将由您来确定字符串操作,但我将向您展示如何定义一个并行执行一系列操作的运算符。

第 1 步:编写fuse函数

您的 fuse 函数似乎使用多个函数映射单个输入,这很容易编写如下:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list
let fuse functionList input = [ for f in functionList -> f input]

请注意,您的所有映射函数都需要具有相同的类型。

第 2 步:定义运算符以并行执行函数

标准的并行映射函数可以写成如下:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array
let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

据我所知,Async.Parallel将并行执行异步操作,其中在任何给定时间执行的并行任务数等于机器上的内核数(如果我错了,有人可以纠正我)。因此,在双核机器上,调用此函数时,我的机器上最多应该运行 2 个线程。这是一件好事,因为我们不希望通过每个内核运行多个线程来提高任何速度(事实上,额外的上下文切换可能会减慢速度)。

我们可以|>>pmapand定义一个运算符fuse

//val ( |>> ) : seq<'a> -> seq<('a -> 'b)> -> 'b list array
let (|>>) input functionList = pmap (fuse functionList) input

因此,|>>操作员获取大量输入并使用许多不同的输出映射它们。到目前为止,如果我们将所有这些放在一起,我们会得到以下结果(在 fsi 中):

> let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];;

val countOccurrences : 'a -> seq<'a> -> int
val length : string -> int
val testData : string [] =
  [|"Juliet is awesome"; "Someone should give her a medal"|]
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|]

testOutput包含两个元素,这两个元素都是并行计算的。

第 3 步:将元素聚合到单个输出中

好的,现在我们有了数组中每个元素表示的部分结果,我们希望将部分结果合并到一个聚合中。我假设数组中的每个元素都应该合并相同的函数,因为输入中的每个元素都具有相同的数据类型。

这是我为这项工作编写的一个非常丑陋的函数:

> let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);;

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list

> reduceMany (+) testOutput;;
val it : int list = [48; 1; 4]

reduceMany接受长度为 n 的序列的序列,并返回一个长度为 n 的数组作为输出。如果你能想到更好的方法来编写这个函数,请成为我的客人 :)

要解码上面的输出:

  • 48 = 我的两个输入字符串的长度之和。请注意,原始字符串是 49 个字符,但将其拆分为“|” 每个“|”吃掉一个字符。
  • 1 = 我输入中所有“J”实例的总和
  • 4 = 'O' 的所有实例的总和。

第 4 步:将所有内容放在一起

let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

let fuse functionList input = [ for f in functionList -> f input]

let (|>>) input functionList = pmap (fuse functionList) input

let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ])

let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o']
    |> reduceMany (+)
于 2009-09-30T18:04:01.473 回答