0

我在 F# 中有以下代码,认为它足够并发以利用我机器的 4 个内核。然而,cpu 的使用仅限于一个核心。

    member x.Solve problemDef =
        use flag = new ManualResetEventSlim(false)
        let foundSoFar = MSet<'T>()
        let workPile = MailboxProcessor<seq<'T>>.Start(fun inbox ->
            let remaining = ref 0
            let rec loop() = async {
                let! data = inbox.Receive()
                let data = data |> Seq.filter (not << foundSoFar.Contains) |> Array.ofSeq
                foundSoFar.UnionWith data
                let jobs = ref -1
                for chunk in data |> Seq.distinct |> Seq.chunked 5000 do
                    Async.Start <| async {
                        Seq.collect problemDef.generators chunk
                        |> Array.ofSeq
                        |> inbox.Post
                    }
                    incr jobs
                remaining := !remaining + !jobs
                if (!remaining = 0 && !jobs = -1) then
                    flag.Set() |> ignore
                else 
                    return! loop()
            }
            loop()
        )
        workPile.Post problemDef.initData
        flag.Wait() |> ignore
        foundSoFar :> seq<_>

我将 MailboxProcessor 用作工作堆,从中获取大量元素,通过 HashSet 过滤它们,并使用其结果插入工作堆的新元素创建任务。重复此过程,直到没有新元素产生。这段代码的目的是在工作堆中异步插入块,从而使用任务。我的问题是没有并行性。

编辑:感谢@jon-harrop,我解决了由于 seq 的惰性导致的并发问题,并按照建议重新编写了代码。有没有办法摆脱 ManualResetEvent 而不使用区分联合作为代理的消息类型(以支持询问消息)?

4

2 回答 2

2

如果没有一个完整的例子,我发现很难理解你的代码做了什么(可能是因为它结合了很多不同的并发编程原语,这使得它有点难以理解)。

无论如何,主体MailboxProcessor只执行一次(如果你想使用普通代理获得并发,你需要启动多个代理)。在代理的主体中,您启动一​​个problemDef.generators为每个chunk.

这意味着problemDef.generators应该并行运行。然而,调用foundSoFar.Containsand foundSoFar.UnionWithas的代码Seq.distinct总是按顺序运行。

因此,如果problemDef.generators是一个简单而高效的函数,跟踪foundSoFar(按顺序完成)的开销可能比并行化得到的要大。

我不熟悉MSet<'T>,但如果它是(或者如果你用它替换)一个线程安全的可变集,那么你应该能够在Task.StartNew(与其他联合并行)中运行一些联合。

PS:如我所说,不运行代码很难分辨,所以我的想法可能完全错误!

于 2013-01-07T16:18:25.490 回答
1

ManualResetEventSlim 您正在混合使用非常糟糕的高级并发原语(任务和代理) 。可以PostAndReply改用吗?

Seq用于在生成的任务中执行“工作”,该任务是惰性的,因此在回发之前它实际上不会做任何事情。你可以用类似的东西在任务内部强制评估Array.ofSeq吗?

您使用的Task方式异常。切换到Async.Start.

如果没有完整的解决方案,我无法验证我的任何猜测......

think 足够并发以利用 4 个内核

您对多核并行性的心智模型可能完全偏离了标准。

于 2013-01-08T08:47:05.700 回答