我在 F# 中有以下代码,认为它足够并发以利用我机器的 4 个内核。然而,cpu 的使用仅限于一个核心。
member x.Solve problemDef =
use flag = new ManualResetEventSlim(false)
let foundSoFar = MSet<'T>()
let workPile = MailboxProcessor<seq<'T>>.Start(fun inbox ->
let remaining = ref 0
let rec loop() = async {
let! data = inbox.Receive()
let data = data |> Seq.filter (not << foundSoFar.Contains) |> Array.ofSeq
foundSoFar.UnionWith data
let jobs = ref -1
for chunk in data |> Seq.distinct |> Seq.chunked 5000 do
Async.Start <| async {
Seq.collect problemDef.generators chunk
|> Array.ofSeq
|> inbox.Post
}
incr jobs
remaining := !remaining + !jobs
if (!remaining = 0 && !jobs = -1) then
flag.Set() |> ignore
else
return! loop()
}
loop()
)
workPile.Post problemDef.initData
flag.Wait() |> ignore
foundSoFar :> seq<_>
我将 MailboxProcessor 用作工作堆,从中获取大量元素,通过 HashSet 过滤它们,并使用其结果插入工作堆的新元素创建任务。重复此过程,直到没有新元素产生。这段代码的目的是在工作堆中异步插入块,从而使用任务。我的问题是没有并行性。
编辑:感谢@jon-harrop,我解决了由于 seq 的惰性导致的并发问题,并按照建议重新编写了代码。有没有办法摆脱 ManualResetEvent 而不使用区分联合作为代理的消息类型(以支持询问消息)?