是时候因为对 .NET 中并发的工作原理缺乏了解而再次让自己难堪了:P
我正在尝试编写一个函数,该函数可以封装创建一个异步工作流,该工作流接受 asyncSeq 输入并将其分发给 n 个并行消费者。
let doWorkInParallel bufferSize numberOfConsumers consumer input = async {
let buffer = BlockingQueueAgent<_>(bufferSize)
let inputFinished = ref false
let produce seq = async {
do! seq |> AsyncSeq.iterAsync (Some >> buffer.AsyncAdd)
do! buffer.AsyncAdd None
}
let consumeParallel degree f = async {
let consume f = async {
while not <| !inputFinished do
try
let! data = buffer.AsyncGet(waitTimeout)
match data with
| Some i -> do! f i
// whichever consumer gets the end of the input flips the inputFinished ref so they'll all stop processing
| None -> inputFinished := true
with | :? TimeoutException -> ()
}
do! [for slot in 1 .. degree -> consume f ]
|> Async.Parallel
|> Async.Ignore
}
let startProducer = produce input
let startConsumers = consumeParallel numberOfConsumers consumer
return! [| startProducer; startConsumers |] |> Async.Parallel |> Async.Ignore
}
我一直在用这样的代码对其进行测试,以模拟一个快速创建的序列,但每个项目的工作都需要一段时间。
let input = seq {1..50} |> AsyncSeq.ofSeq
let action i = async {
do! Async.Sleep 500
printfn "%d GET" i
}
let test = doWorkInParallel 10 2 action input
Async.RunSynchronously test
编辑:我似乎已经解决了我最初的问题(呃,我真的没有想过我在用 EventWaitHandle 做什么,太傻了),但我仍然很想听听这是否是解决问题的一种完全愚蠢的方法与 F#。