4

我想使用任务并行库在Silverlight 5.0应用程序中实现以下内容(SL5 有任务工厂但没有 Parallel.For)。我有很多线程知识,但没有关于 TPL 的知识,所以这似乎是一个很好的任务:)

目前我有一些代码,同步执行如下:

public interface IProcessor
{
    IEnumerable<Bar> Provide(Foo param)
}

private IEnumerable<IProcessor> processors; 

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
    List<Bar> allResults = new List<Bar>();

    foreach(var processor in this.processors)
    {
        allResults.AddRange(processor.Provide(param));
    }

    callback(allResults);
}

考虑每个IProcessor接受一个Foo参数来Provide返回一个IEnumerable<Bar>. 所有结果的聚合通过回调发送回调用者。

现在一些 IProcessor 立即执行。有些人调用服务器,可能需要几秒钟。我想为 NIProcessor个实例安排 N 个任务,并在所有任务都完成(或超时)时连接IEnumerable<Bar>结果。

如果可能的话,我想为整个操作添加一个超时,所以如果所有操作都没有在 15 秒内完成,请抛出。

非常感谢您的帮助:)

4

5 回答 5

4

同样,我无法测试此代码,但如果 Silverlight 没有 Parallel.ForEach 你可以使用 Task.WaitAll 它应该可以工作

    private IEnumerable<IProcessor> processors;

    public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
    {
        var allResults = new ConcurrentQueue<Bar>();
        Task.WaitAll(processors.Select(processor => Task.Factory.StartNew(() => GetData(processor, param, allResults))).ToArray());
        callback(allResults);
    }

    private static void GetData(IProcessor processor, Foo param, ConcurrentQueue<Bar> allResults)
    {
        var enumerable = processor.Provide(param);
        foreach (var bar in enumerable)
        {
            allResults.Enqueue(bar);
        }
    }
于 2012-07-11T13:07:08.670 回答
1

为什么不使用类似下面的东西

// Allow for cancellation.
CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken token = new CancellationToken();
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; 

List<Bar> allResults = new List<Bar>();
Task<List<Bar>> asyncTask = Task.Factory.StartNew<List<Bar>>(() => asyncMethod(token, atp), token);

// Continuation is returned when the asyncMethod is complete.
asyncTask.ContinueWith(task =>
{
    // Handle the result.
    switch (task.Status)
    {
        // Handle any exceptions to prevent UnobservedTaskException.             
        case TaskStatus.RanToCompletion:
            break;
        case TaskStatus.Canceled:
            break;
        case TaskStatus.Faulted:
    }
}

在您的asyncMethod中,您可以执行以下操作

private List<Bar> asyncMethod(CancellationToken token)
{
    List<Bar> allResults = new List<Bar>();

    foreach(var processor in this.processors)
    {
        Task.Factory.StartNew<IEnumerable<Bar>>(() => 
        {     
            processor.Provide(param);
        }, atp).ContinueWith( cont => { allResults.AddRange(cont.Result) }); 

        // Cancellation requested from UI Thread.
        if (token.IsCancellationRequested)
            token.ThrowIfCancellationRequested();
    }
    return allResults;
}

然后,你可以从第一个片段中List<Bar>调用的延续中获得总体结果 (a) 。task您可以通过 UI 中的某个事件调用取消,例如

// Cancellation requested from UI Thread.
if (token.IsCancellationRequested)
    token.ThrowIfCancellationRequested();

我无法对此进行测试,但类似上面的东西应该可以工作。请参阅此对 TPL 的精彩介绍以获取更多信息和该类的使用...

我希望这是有用的。

于 2012-07-11T17:27:59.037 回答
1

我认为这大致正确

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
    var allResults = new List<Bar>();

    // We are using all the default options on the TaskFactory 
    // except when we are appending the results this has to be synchronized 
    // as List<> is not multithreading safe (a more appropriate collection perhaps) 
    var taskFactory = new TaskFactory<IEnumerable<Bar>>(
        TaskCreationOptions.None,
        TaskContinuationOptions.ExecuteSynchronously);

    // Kick off a task for every processor
    var tasks =
        new List<Task<IEnumerable<Bar>>>(processors.Count());
    tasks.AddRange(
        processors.Select(
            processor =>
            taskFactory.StartNew(() => processor.Provide(param))));

    if (Task.WaitAll(tasks.ToArray(), 5 * 1000))
    {
        foreach (Task<IEnumerable<Bar>> task in tasks)
        {
            allResults.AddRange(task.Result);
        }
        callback(allResults);
    }
} 
于 2012-07-11T14:26:23.737 回答
1

这将异步并行运行所有任务:

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
    // since each task's callback would access this storage - we are using
    // one of the concurrent queue
    ConcurrentQueue<Bar> allResults = new ConcurrentQueue<Bar>();

    Task[] tasks = this.processors.Select(p => new Task(() =>
        {
            IEnumerable<Bar> results = p.Provide(param);
            foreach (var newItem in results)
            {
                allResults.Enqueue(newItem);
            }
        })).ToArray();

    foreach (var task in tasks)
    {
        task.Start();
    }

    // 5 seconds to wait or inject a value into this method
    Task.WaitAll(tasks, 5000);                
    callback(allResults);
}
于 2012-07-11T14:35:29.203 回答
0

使用 TPL,您可以传递循环状态以通知其他线程在超时的情况下中止。您需要执行以下操作:

    public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
    {
        ConcurrentBag<Bar> allResults = new ConcurrentBag<Bar>();

        Stopwatch sw = new Stopwatch();
        sw.Start();

        Parallel.ForEach(this.processors, (processor, loopState) =>
        {
            foreach (Bar item in processor.Provide(param))
            {
                allResults.Add(item);
            }

            if (sw.ElapsedMilliseconds > 15000)
            {
                loopState.Stop();
                throw new TimeoutException();
            }
        });

        callback(allResults);
    }
于 2012-07-11T13:20:06.450 回答