0

澄清问题:

我有以下访问网络和本地资源的脚本。我想将网络连接限制为 N(网站速度很慢),并且本地资源访问 ( executeLocalNetworkProcess) 不应阻止其他网络请求。(所以它总是会运行 N 个网络请求)。

有些类别的项目很少,而另一些类别的项目很多。应该为所有类别的所有项目运行并行执行以利用网络连接。

let categories = getCategories() // get a seq of category from web service
for c in categories do
    getItemsByCategory c // returns a seq of item from web 
    |> Seq.iter (fun (item, _) -> // Want to process N items in parallel
        getMoreDataFromWeb item // from web
        executeLocalNetworkProcess item // local disk/network access; may take a while
        downloadBigFile item  // from web
        )

在 F# 中实现它的最佳方法是什么?

4

2 回答 2

1

我之前做过类似的事情,将序列拆分成不同大小n的批次并并行处理批次。

我们可以使用此 SO 答案中的代码创建批量序列:https ://stackoverflow.com/a/7518857/2461791

从那里我们只需要并行遍历每批中的项目。我喜欢把它放在Array.Parallel模块中。

module Array =
    module Parallel =
        let batchIter size action array =
            let batchesOf n =
                Seq.mapi (fun i v -> i / n, v) >>
                Seq.groupBy fst >>
                Seq.map snd >>
                Seq.map (Seq.map snd)

            for batch in array |> batchesOf size do 
                batch
                |> Seq.toArray    
                |> Array.Parallel.iter action

以下代码将 100 个项目的列表拆分为 8 个批次,并并行打印每个批次的项目。

[1..100]
|> Array.Parallel.batchIter 8 (printfn "%d")

要将其应用于您的代码,您正在查看如下内容:

let categories = getCategories()
for c in categories do
    match c with | (category, description, _) -> printfn "%s" category
    getItemsByCategory c
    |> Array.Parallel.batchIter 8 (fun (item, description, _) ->
        process item
        )

但是,这种方法将等待整个批次完成处理,然后再开始下一个批次。

于 2013-10-08T08:03:06.920 回答
1

您可能希望将来自 F# PowerPack 的PSeq模块源代码包含在您自己的基础库中。然后,您可以简单地调用 PSeq.iter:

for category, description, _ as c in getCategories() do
    printfn "%s" category
    getItemsByCategory c
    |> PSeq.iter(fun (item, description, _) -> process item)
于 2013-10-08T13:13:04.733 回答