问题
尽管我将在这里讨论的代码是用 F# 编写的,但它是基于 .NET 4 框架的,并不具体取决于 F# 的任何特殊性(至少看起来如此!)。
我的磁盘上有一些数据,我应该从网络更新,将最新版本保存到磁盘:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
问题是对于loadAndSaveAndUpdate
我的所有数据,我必须多次执行该函数:
{1 .. 5000} |> loadAndSaveAndUpdate
每一步都会做
- 一些磁盘 IO,
- 一些数据处理,
- 一些网络 IO(可能有很多延迟),
- 更多的数据处理,
- 和一些磁盘 IO。
在某种程度上并行完成这不是很好吗?不幸的是,我的阅读和解析功能都不是“异步工作流就绪”。
我想出的第一个(不是很好)解决方案
任务
我做的第一件事是设置一个Task[]
并启动它们:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
然后我按 CTRL+ESC 只是为了查看它使用了多少线程。15, 17, ..., 35, ..., 170, ... 直到杀死应用程序!出了点问题。
平行线
我做了几乎同样的事情,但使用Parallel.ForEach(...)
和结果是一样的:很多很多很多线程。
一个有效的解决方案……有点
然后我决定只启动n
线程,Task.WaitAll(of them)
然后是其他n
,直到没有更多可用的任务。
这是可行的,但问题是当它完成处理n-1
任务时,它会等待,等待,等待最后一个由于大量网络延迟而坚持阻塞的任务。不是很好!
那么,你将如何解决这个问题呢?我很乐意查看不同的解决方案,包括异步工作流(以及在这种情况下如何调整我的非异步函数)、并行扩展、奇怪的并行模式等。
谢谢。