1

我想知道一些并行任务何时完成。

我正在使用此代码在网站上制作 1500 到 2000 之间的小型 WebClient.DownloadString 和 10 秒 HttpRequest Timeout:

Task.Factory.StartNew(() => 
    Parallel.ForEach<string>(myKeywords, new ParallelOptions 
    { MaxDegreeOfParallelism = 5 }, getKey));

有时,查询失败,因此出现异常,函数永远无法完成,并且每个 getKey 函数内部的 UI 刷新有时似乎被调用了两次,因此我无法准确了解完成了多少任务。我在计算:UI 刷新调用次数/关键字总数,得到的结果在 100% 到 250% 之间,我不知道任务何时完成。我搜索了很多 SO 讨论,但没有一个是直接的方法或适合我需要的方法。所以我猜 Framework 4.0 没有提供任何 Tasks.AllCompleted 事件处理程序或类似的解决方法?

我应该在另一个线程而不是我的 UI 线程中运行我的 Parallel.Foreach 然后添加它吗?

myTasks.WaitAll

[编辑]

一个临时的解决方案是将我的字符串列表复制到一个 ArrayList 中,然后在每个查询开始时从列表中一个一个地删除每个项目。无论何时该功能运行良好与否,我都知道何时处理了所有项目。

4

1 回答 1

2

Parallel.ForEach在处理异常方面与其他循环没有什么不同。如果抛出异常,那么它将停止处理循环。这可能就是您看到百分比差异的原因(我假设您在处理循环时可能正在处理计数)。

此外,您实际上并不需要Parallel.ForEach,因为您在WebClient上进行的异步调用将阻塞等待IO 完成(网络响应),它们不受计算限制(Parallel.ForEach当您受到计算限制时会好得多)。

也就是说,您应该首先将您的调用翻译WebClient为使用Task<TResult>使用类将基于事件的异步模式转换为基于任务的异步模式很简单。TaskCompletionSource<TResult>

假设您有一系列Uri因调用而生成的实例getKey,您可以创建一个函数来执行此操作:

static Task<String> DownloadStringAsync(Uri uri)
{
    // Create a WebClient
    var wc = new WebClient();

    // Set up your web client.

    // Create the TaskCompletionSource.
    var tcs = new TaskCompletionSource<string>();

    // Set the event handler on the web client.
    wc.DownloadStringCompleted += (s, e) => {
        // Dispose of the WebClient when done.
        using (wc)
        {
            // Set the task completion source based on the
            // event.
            if (e.Cancelled)
            {
                // Set cancellation.
                tcs.SetCancelled();
                return;
            }

            // Exception?
            if (e.Error != null)
            { 
                // Set exception.
                tcs.SetException(e.Error);
                return;
            }

            // Set result.
            tcs.SetResult(e.Result);
        };

    // Return the task.
    return tcs.Task;
};

请注意,上面的内容可以优化为使用one WebClient,留给您作为练习(假设您的测试表明您需要它)。

从那里,您可以获得以下序列Task<string>

// Gotten from myKeywords
IEnumerable<Uri> uris = ...;

// The tasks.
Task<string>[] tasks = uris.Select(DownloadStringAsync).ToArray();

请注意,您必须调用ToArray扩展方法才能开始运行任务。这是为了绕过延迟执行。您不必调用ToArray,但必须调用将枚举整个列表并导致任务开始运行的东西。

一旦有了这些实例,就可以通过调用class上的方法Task<string>来等待它们全部完成,如下所示:ContinueWhenAll<TAntecedentResult>TaskFactory

Task.Factory.ContinueWhenAll(tasks, a => { }).Wait();

完成后,您可以循环遍历tasks数组并查看Exception 和/或Result属性以检查异常或结果是什么。

如果您正在更新用户界面,那么您应该查看拦截对Enumerable.Select的调用,也就是说,您应该在下载完成时调用上的ContinueWith<TNewResult>方法Task<TResult>来执行操作,如下所示:

// The tasks.
Task<string>[] tasks = uris.
    Select(DownloadStringAsync).
    // Select receives a Task<T> here, continue that.
    Select(t => t.ContinueWith(t2 => {
        // Do something here: 
        //   - increment a count
        //   - fire an event
        //   - update the UI
        // Note that you have to take care of synchronization here, so
        // make sure to synchronize access to a count, or serialize calls
        // to the UI thread appropriately with a SynchronizationContext.
        ...

        // Return the result, this ensures that you'll have a Task<string>
        // waiting.
        return t2;
    })).
    ToArray();

这将允许您在事情发生时对其进行更新。请注意,在上述情况下,如果您Select再次调用,您可能需要检查状态t2并触发其他一些事件,具体取决于您希望错误处理机制是什么。

于 2012-10-09T20:29:26.483 回答