1

并行任务的工作流程

http://i.stack.imgur.com/luRnu.png

我希望能就我面临的问题获得帮助。所以问题是我正在运行并行任务来搜索文件夹中的文件。每个任务都需要识别文件并将其添加到文件数组中。接下来,等待每个任务完成,以便收集文件,然后对结果进行排序。接下来,独立处理已排序的文件,通过对每个文件运行一个任务来读取它以获取匹配的模式。最后阶段是以人类可读的格式将所有结果汇总在一起,并以用户友好的方式显示。

所以问题是我想以不阻塞 UI 线程的正确方式链接任务。我希望能够在程序所处的任何阶段取消所有内容。

把它们加起来:

第 1 阶段:通过搜索文件夹来查找文件。每个任务通过文件夹树递归搜索。

第 2 阶段:对找到的所有文件进行排序并清理重复项

第 3 阶段:开始新任务以独立处理文件。每个任务都会打开一个文件并搜索匹配的模式。

第 4 阶段:将每个文件搜索的结果聚合成一个巨大的结果集,并使其易于阅读。

     List<Task> myTasks = new List<Task>();

// ==== stage 1 ======
        for(int i = 0; i < 10; i++) {
           string directoryName = directories[i];

           Task t = new Task(() =>
           {
              FindFiles(directoryName);
           });

           myTasks.Add(t);
            t.Start();
        }

// ==== stage 2 ====
        Task sortTask = Task.Factory.ContinueWhenAll(myTasks.ToArray(), (t) =>
        {
           if(_fileResults.Count > 1) {
              // sort the files and remove any duplicates
           }
        });

        sortTask.Wait();

// ==== stage 3 ====
        Task tt = new Task(() =>
        {
             Parallel.For(0, _fileResults.Count, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = token, TaskScheduler = _taskScheduler },
                    (i, loopstate) => {
              // 1. open file
              // 2. read file
              // 3. read file line by line
              }
        }

// == stage 4 === 
        tt.ContinueWith((t) =>
        {
           // 1. aggregate the file results into one giant result set
           // 2. display the giant result set in human readable format
        }, token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.FromCurrentSynchronizationContext());

      tt.start();
4

2 回答 2

1

不要同步等待任何任务完成。如果任何这些操作需要在先前创建的任务之后进行,则将该工作添加为该任务的延续。

于 2016-01-05T18:55:05.907 回答
0

您是否考虑过使用该async/await功能 - 根据您的问题,它非常适合您的需求。这是使用它的问题的快速尝试:

try
{
    List<Task<File[]>> stage1Tasks = new List<Task<File[]>>();

    // ==== stage 1 ======
    for (int i = 0; i < 10; i++)
    {
        string directoryName = directories[i];

        Task<File[]> t = Task.Run(() =>
        {
            return FindFiles(directoryName);
        },
            token);

        stage1Tasks.Add(t);
    }

    File[][] files = await Task.WhenAll(stage1Tasks).ConfigureAwait(false);

    // Flatten files.
    File[] _fileResults = files.SelectMany(x => x).ToArray();

    // ==== stage 2 ====
    Task<File[]> sortFilesTask = Task.Run(() =>
    {
        if (_fileResults.Count > 1)
        {
            // sort the files and remove any duplicates
            return _fileResults.Reverse().ToArray();
        }
    },
        token);

    File[] _sortedFileResults = await sortFilesTask.ConfigureAwait(false);

    // ==== stage 3 ====
    Task<SomeResult[]> tt = Task.Run(() =>
    {
        SomeResult[] results = new SomeResult[_sortedFileResults.Length];
        Parallel.ForEach(_sortedFileResults,
            new ParallelOptions {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                CancellationToken = token,
                TaskScheduler = _taskScheduler
            },
            (i, loopstate) =>
            {
                // 1. open file
                // 2. read file
                // 3. read file line by line
                results[i] = new SomeResult( /* here goes your results for each file */);
            });
        return results;
    },
        token);

    SomeResult[] theResults = await tt.ConfigureAwait(false);


    // == stage 4 === 
    // 1. aggregate the file results into one giant result set
    // 2. display the giant result set in human readable format
    // ....

}
catch (TaskCanceledException)
{
    // some task has been cancelled...
}
于 2016-01-07T13:55:50.707 回答