5

如何合并List<T>基于 TPL 的任务以供以后执行?

 public async IEnumerable<Task<string>> CreateTasks(){ /* stuff*/ }

我的假设是.Concat()...

     void MainTestApp()  // Full sample available upon request.
     {
        List<string> nothingList = new List<string>();
        nothingList.Add("whatever");
        cts = new CancellationTokenSource();

         delayedExecution =
            from str in nothingList
            select AccessTheWebAsync("", cts.Token);
         delayedExecution2 =
          from str in nothingList
          select AccessTheWebAsync("1", cts.Token);

         delayedExecution = delayedExecution.Concat(delayedExecution2);
     }


    /// SNIP

    async Task AccessTheWebAsync(string nothing, CancellationToken ct)
    {
        // return a Task
    }

我想确保这不会产生任何任务或评估任何东西。事实上,我想我在问“什么逻辑上执行 IQueryable 到返回数据的东西”?

背景

由于我正在进行递归并且我不想在正确的时间之前执行此操作,如果多次调用合并结果的正确方法是什么?

如果重要的话,我正在考虑运行此命令来启动var AllRunningDataTasks = results.ToList();此代码后跟的所有任务:

while (AllRunningDataTasks.Count > 0)
{
    // Identify the first task that completes.
    Task<TableResult> firstFinishedTask = await Task.WhenAny(AllRunningDataTasks);

    // ***Remove the selected task from the list so that you don't
    // process it more than once.
    AllRunningDataTasks.Remove(firstFinishedTask);

    // TODO: Await the completed task.
    var taskOfTableResult = await firstFinishedTask;

    // Todo: (doen't work)
    TrustState thisState = (TrustState)firstFinishedTask.AsyncState;

    // TODO: Update the concurrent dictionary with data
    // thisState.QueryStartPoint + thisState.ThingToSearchFor 

    Interlocked.Decrement(ref thisState.RunningDirectQueries);
    Interlocked.Increment(ref thisState.CompletedDirectQueries);

    if (thisState.RunningDirectQueries == 0)
    {
        thisState.TimeCompleted = DateTime.UtcNow;
    }
}
4

2 回答 2

2

要回答具体问题“什么逻辑上执行 IQueryable 到返回数据的东西”?这将是任何强制产生至少一个值,或强制发现一个值是否可用的东西。

例如,ToListToArrayFirstSingleSingleOrDefaultCount都将强制求值。(虽然First不会评估整个集合 - 它会检索第一个项目然后停止。)这些都必须至少开始检索值,因为如果不这样做,它们中的任何一个都无法返回它们返回的值。在 and 的情况下ToListToArray这些返回完全填充的非惰性集合,这就是他们必须评估所有内容的原因。返回单个项目的方法至少需要询问第一个项目,然后这些Single方法将继续检查如果继续评估是否没有其他结果(如果结果更多,则抛出异常)。

使用foreach迭代查询也将强制评估。(同样,出于同样的原因:您向它询问集合中的实际值,因此它必须提供它们。)

Concat不会立即评估,因为它不需要 - 只有当您向串联序列询问一个值时,它才需要向其输入询问值。

顺便说一句,尽管您在这里的示例中询问过IQueryable您没有使用它。这可能很重要,因为与您实际获得的 LINQ to Objects 实现(您获得 plain IEnumerable<T>)相比,它的工作方式存在一些差异。我认为这在这个示例中没有什么不同,但它让我想知道您的原始代码与您在此处发布的用于说明的版本之间是否可能发生了变化?这很重要,因为不同的 LINQ 提供者可以以不同的方式做事。绝对使用延迟评估的IEnumerable<T>味道Concat,虽然我希望大多数其他 LINQ 实现都是如此,但这并不是绝对的。

如果您需要多次使用结果,并且您想确保只评估一次,但直到您真正需要它们时才评估它们,那么通常的方法是ToList在您确实需要时调用进行评估,然后保留结果List<T>,以便您可以再次使用它。一旦您以List<T>(或数组)形式获得数据,您就可以根据需要多次使用该列表。

顺便说一句,你的第一个问题有一个问题:

“如何合并基于 TPL 的任务列表以供以后执行?”

一般来说,如果您已经有一个 TPL 任务,那么您无法阻止它执行。(有一个例外。如果您Task直接构造 a 而不是使用一种更正常的创建方法,它实际上不会运行,直到您告诉它。但通常,返回任务的 API 返回实时任务,即,当您拿到它们时,它们很可能已经在运行,甚至已经完成。)

您的示例中的“稍后执行”来自这样一个事实,即您实际上根本没有要开始的任务列表。(如果您确实有一个List<T>任务,那么“稍后执行”将不是一个选项。)您拥有的是几个可枚举,如果您要评估它们,它们将创建任务。创建任务的行为与在任何返回任务的 TAP 样式 API 中启动它的行为是不可分割的。

根据您所写的其余内容,我认为您真正要问的是:

“我如何将多个IEnumerable<Task<T>>对象合并为一个对象IEnumerable<Task<T>>,从而推迟对基础枚举的评估,直到组合的可枚举本身被评估?”

Concat应该为此工作。

于 2013-02-05T17:00:13.100 回答
0

以下是合并数据的一种 hacky 方式...我不喜欢在 Main 或此示例的其他一些方面使用“nothingList”的方式,但它似乎完成了工作并允许我合并待处理的任务。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

// Add a using directive and a reference for System.Net.Http.
using System.Net.Http;

// Add the following using directive.
using System.Threading;


namespace ProcessTasksAsTheyFinish
{
    public partial class MainWindow : Window
    {
        // Declare a System.Threading.CancellationTokenSource.
        CancellationTokenSource cts;
        List<IEnumerable<Task>> launchList = new List<IEnumerable<Task>>();

        public MainWindow()
        {
            InitializeComponent();

            List<string> nothingList = new List<string>();
            nothingList.Add("whatever");

            cts = new CancellationTokenSource();

             delayedExecution =
                from str in nothingList
                select AccessTheWebAsync("", cts.Token);


             List<string> nothingList2 = new List<string>();
             nothingList2.Add("whatever");

             delayedExecution2 =
              from str in nothingList2
              select AccessTheWebAsync("1", cts.Token);


             launchList.Add(delayedExecution);
             launchList.Add(delayedExecution2);

             delayedExecution = delayedExecution.Concat(delayedExecution2);
        }
        IEnumerable<Task> delayedExecution = null;
        IEnumerable<Task> delayedExecution2 = null;

        private async void startButton_Click(object sender, RoutedEventArgs e)
        {
            resultsTextBox.Clear();

            // Instantiate the CancellationTokenSource.

            try
            {
                // ***Set up the CancellationTokenSource to cancel after 25 seconds.
                //cts.CancelAfter(250000);

                var test  =  delayedExecution;// AccessTheWebAsync("", cts.Token);

                var testList = test.ToList();

                while (testList.Count() > 0)
                {
                    var firstFinishedTask = await Task.WhenAny(testList);
                    testList.Remove(firstFinishedTask);

                      await firstFinishedTask;
                }

                resultsTextBox.Text += "\r\nDownloads complete.";
            }
            catch (OperationCanceledException tee)
            {
                resultsTextBox.Text += "\r\nDownloads canceled.\r\n";
            }
            catch (Exception)
            {
                resultsTextBox.Text += "\r\nDownloads failed.\r\n";
            }

            cts = null;
        }


        private void cancelButton_Click(object sender, RoutedEventArgs e)
        {
            if (cts != null)
            {
                cts.Cancel();
            }
        }


        async Task<string> AccessTheWebAsync(string nothing, CancellationToken ct)
        {
            // CHANGE THIS VALUE TO CONTROL THE TESTING
            bool delayConversionOfQueryToList = false;

            HttpClient client = new HttpClient();

            // Make a list of web addresses.
            List<string> urlList = null;

            if (nothing == "1")
            {
                urlList = SetUpURLList2();
            }
            else urlList = SetUpURLList();

            // ***Create a query that, when executed, returns a collection of tasks.
            IEnumerable<Task<int>> downloadTasksQuery =
                from url in urlList select ProcessURL(url, client, ct);

            // DEBUG!!!
            if (delayConversionOfQueryToList == true)
            {
                await Task.Delay(10000);
                resultsTextBox.Text += String.Format("\r\nDelay of IQueryable complete.  Tip: Did you see any IsRunning messages?");
            }

            // ***Use ToList to execute the query and start the tasks. 
            List<Task<int>> downloadTasks = downloadTasksQuery.ToList();

            // DEBUG!!!
            if (delayConversionOfQueryToList == false)
            {
                await Task.Delay(10000);
                resultsTextBox.Text += String.Format("\r\nDelay of .ToList() complete.   Tip: Did you see any IsRunning messages?");
            }

            // ***Add a loop to process the tasks one at a time until none remain.
            while (downloadTasks.Count() > 0)
            {
                // Identify the first task that completes.
                Task<int> firstFinishedTask = await Task.WhenAny(downloadTasks);

                resultsTextBox.Text += String.Format("\r\nID  {0}", firstFinishedTask.Id);

                // ***Remove the selected task from the list so that you don't
                // process it more than once.
                downloadTasks.Remove(firstFinishedTask);

                // Await the completed task.
                int length = await firstFinishedTask;
                resultsTextBox.Text += String.Format("\r\nLength of the download:  {0}", length);
            }

            return nothing;
        }


        private List<string> SetUpURLList()
        {
            List<string> urls = new List<string> 
            { 
                "http://msdn.microsoft.com",
                "http://msdn.microsoft.com/library/windows/apps/br211380.aspx",
                "http://msdn.microsoft.com/en-us/library/hh290136.aspx",
                "http://msdn.microsoft.com/en-us/library/dd470362.aspx",
                "http://msdn.microsoft.com/en-us/library/aa578028.aspx",
                "http://msdn.microsoft.com/en-us/library/ms404677.aspx",
                "http://msdn.microsoft.com/en-us/library/ff730837.aspx"
            };
            return urls;
        }
        private List<string> SetUpURLList2()
        {
            List<string> urls = new List<string> 
            { 
                "http://www.google.com",

            };
            return urls;
        }

        async Task<int> ProcessURL(string url, HttpClient client, CancellationToken ct)
        {
            resultsTextBox.Text += String.Format("\r\nIS RUNNING {0}", url);

            // GetAsync returns a Task<HttpResponseMessage>. 
            HttpResponseMessage response = await client.GetAsync(url, ct);
            // Retrieve the website contents from the HttpResponseMessage.
            byte[] urlContents = await response.Content.ReadAsByteArrayAsync();

           // Thread.Sleep(3000);
           // await Task.Delay(1000, ct);
           return urlContents.Length;
        }
    }
}

// Sample Output:

IS RUNNING http://msdn.microsoft.com
IS RUNNING http://msdn.microsoft.com/library/windows/apps/br211380.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/hh290136.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/dd470362.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/aa578028.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/ms404677.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/ff730837.aspx
IS RUNNING http://www.google.com
Delay of .ToList() complete.   Tip: Did you see any IsRunning messages?
ID  1
Length of the download:  48933
ID  2
Length of the download:  375328
ID  3
Length of the download:  220428
ID  4
Length of the download:  222256
ID  5
Length of the download:  229330
ID  6
Length of the download:  136544
ID  7
Length of the download:  207171
Delay of .ToList() complete.   Tip: Did you see any IsRunning messages?
ID  8
Length of the download:  43945
Downloads complete.
于 2012-12-02T05:01:15.410 回答