1

我最近遇到了一个案例,可以很方便地产生一堆线程,阻塞并等待一个答案(第一个到达),取消其余线程然后解除阻塞。

例如,假设我有一个采用种子值的搜索函数。让我们规定搜索功能可以简单地并行化。此外,我们的搜索空间包含许多潜在的解决方案,并且对于某些种子值,该函数将无限期地搜索,但至少一个种子值将在合理的时间内产生一个解决方案。

如果我可以完全天真地并行进行此搜索,那就太好了,例如:

let seeds = [|0..100|]
Array.Parallel.map(fun seed -> Search(seed)) seeds

可悲的是,Array.Parallel.map将阻塞直到所有线程都完成。真可惜。我总是可以在搜索功能中设置一个超时,但我几乎可以肯定要等待运行时间最长的线程完成;此外,对于某些问题,超时可能不够长。

简而言之,我想要类似于 UNIX 套接字select()调用的东西,仅适用于任意函数。这可能吗?它不必像上面那样采用漂亮的数据并行抽象,也不必是 F# 代码。我什至很乐意使用本机库并通过 P/Invoke 调用它。

4

4 回答 4

4

您可以创建一堆任务,然后分别使用Task.WaitAnyTask.WhenAny同步等待第一个任务完成或创建一个将在第一个任务完成时完成的任务。

一个简单的同步示例:

var tasks = new List<Task<int>>();
var cts = new CancellationTokenSource();

for (int i = 0; i < 10; i++)
{
    int temp = i;
    tasks.Add(Task.Run(() =>
    {
        //placeholder for real work of variable time
        Thread.Sleep(1000 * temp);

        return i;
    }, cts.Token));
}

var value = Task.WaitAny(tasks.ToArray());
cts.Cancel();

或者对于异步版本:

public static async Task<int> Foo()
{
    var tasks = new List<Task<int>>();
    var cts = new CancellationTokenSource();

    for (int i = 0; i < 10; i++)
    {
        int temp = i;
        tasks.Add(Task.Run(async () =>
        {
            await Task.Delay(1000 * temp, cts.Token);
            return temp;
        }));
    }

    var value = await await Task.WhenAny(tasks);
    cts.Cancel();

    return value;
}
于 2013-09-12T19:42:45.227 回答
3
let rnd = System.Random()

let search seed = async {
    let t = rnd.Next(10000)
    //printfn "seed: %d  ms: %d" seed t
    do! Async.Sleep t
    return sprintf "seed %d finish" seed
}

let processResult result = async {
    //Todo:
    printfn "%s" result
}

let cts = new System.Threading.CancellationTokenSource()
let ignoreFun _ = () //if you don't want handle

let tasks = 
    [0..10]
    |> List.map (fun i ->
        async {
            let! result = search i
            do! processResult result
            cts.Cancel()
        }
    )

Async.StartWithContinuations(Async.Parallel tasks, ignoreFun, ignoreFun, ignoreFun, cts.Token)
于 2013-09-13T04:41:29.620 回答
0

这似乎对我有用

namespace CancellParallelLoops
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();

            // Use ParallelOptions instance to store the CancellationToken
            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
            Console.WriteLine("Press any key to start. Press 'c' to cancel.");
            Console.ReadKey();

            // Run a task so that we can cancel from another thread.
            Task.Factory.StartNew(() =>
            {
                if (Console.ReadKey().KeyChar == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit");
            });

            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    if (num == 1000) cts.Cancel();
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }

            Console.ReadKey();
        }
    }
}
于 2013-09-12T19:48:18.803 回答
0

尝试使用事件对象同步所有线程,当您找到解决方案设置事件时,所有其他线程必须定期检查事件状态并停止执行(如果已设置)。

更多详情,请看这里

于 2013-09-12T19:37:28.307 回答