0

我以下列方式使用 AsParallel 与 WithDegreeOfParallelism 和 WithCancellation

AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2)

这是我对此的理解。一次只处理两个传入序列。一旦其中一个请求完成,就会处理更多项目。但是,如果发起取消请求,则将根本处理来自传入队列的尚未提取的那些项目。基于这种理解,我创建了以下代码。

class Employee
    {
        public int ID { get; set;}
        public string FirstName { get; set;}
        public string LastName { get; set;}
    }

    class Program
    {

        private static List<Employee> _Employees;
        static CancellationTokenSource cs = new CancellationTokenSource();
        static Random rand = new Random();

        static void Main(string[] args)
        {
            _Employees = new List<Employee>() 
            {
                new Employee() { ID = 1, FirstName = "John", LastName = "Doe" },
                new Employee() { ID = 2, FirstName = "Peter", LastName = "Saul" },
                new Employee() { ID = 3, FirstName = "Mike", LastName = "Sue" },
                new Employee() { ID = 4, FirstName = "Catherina", LastName = "Desoza" },
                new Employee() { ID = 5, FirstName = "Paul", LastName = "Smith" },
                new Employee() { ID = 6, FirstName = "Paul2", LastName = "Smith" },
                new Employee() { ID = 7, FirstName = "Paul3", LastName = "Smith" },
                new Employee() { ID = 8, FirstName = "Paul4", LastName = "Smith" },
                new Employee() { ID = 9, FirstName = "Paul5", LastName = "Smith" },
                new Employee() { ID = 10, FirstName = "Paul6", LastName = "Smith" },
                new Employee() { ID = 5, FirstName = "Paul7", LastName = "Smith" }
            };

            try
            {
                var tasks = _Employees.AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token)).ToArray();
                Console.WriteLine("Now waiting");
                Thread.Sleep(1000);
                cs.Cancel();
                Task.WaitAll(tasks);
            }
            catch (AggregateException ae)
            {
                // error handling code
                Console.WriteLine("something bad happened");
            }
            catch (Exception ex)
            {
                // error handling code
                Console.WriteLine("something even worst happened");
            }
            // other stuff
            Console.WriteLine("All Done");
        }

        private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
        {
            if (token.IsCancellationRequested)
            {
                Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
                return;
            }
            int Sleep = rand.Next(800, 2000);
            Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
            await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));

            Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
        }

    }

这是我运行它时的输出。

ThreadID = 3 -> 员工 1 -> 睡眠 1058
ThreadID = 1 -> 员工 7 -> 睡眠 1187
ThreadID = 1 -> 员工 8 -> 睡眠 1296
ThreadID = 1 -> 员工 9 -> 睡眠 1614
ThreadID = 1 -> 员工 10 -> 为 1607 睡眠
ThreadID = 1 -> 员工 5 -> 为 1928 睡眠
ThreadID = 3 -> 员工 2 -> 为 1487 睡眠
ThreadID = 3 -> 员工 3 -> 为 1535 睡眠
ThreadID = 3 - > 员工 4 -> 为 1265 休眠
ThreadID = 3 -> 员工 5 -> 为 1248 休眠
ThreadID = 3 -> 员工 6 -> 为 807 休眠
现在正在等待
ThreadID = 3 -> 员工 6 完成
ThreadID = 4 ->员工 1 完成
ThreadID = 5 -> 员工 7 完成
ThreadID = 6 -> 员工 8 完成
ThreadID = 3 -> 员工 5 完成
ThreadID = 4 -> 员工 9 完成
ThreadID = 5 -> 员工 10 完成
ThreadID = 6 -> 员工 5 完成
ThreadID = 3 -> 员工 4 完成
ThreadID = 7 -> 员工 2 完成
ThreadID = 8 -> 员工 3 完成
全部完成

这是我的问题(根据我对事物的理解)。

  1. 我期待对于某些员工 ProcessThisEmployee 根本不会被调用,因为它将被取消,但它会被所有员工调用
  2. 即使调用 ProcessThisEmployee 方法,它也会通过以下代码路径,这也不会发生

    if ( token.IsCancellationRequested )
    {
        Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled",System.Threading.Thread.CurrentThread.ManagedThreadId));
        return;
    }
    

于是我改了ProcessThisEmployee,基本上是在Sleep之后移动了token.IsCancellationRequested消息如下。

private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
{

    int Sleep = rand.Next(800, 2000);
    Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
    await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));
    if (token.IsCancellationRequested)
    {
        Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
        return;
    }
    Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
}

现在我得到以下输出。

ThreadID = 3 -> Employee 1 -> Sleeping for 1330  
ThreadID = 1 -> Employee 7 -> Sleeping for 1868  
ThreadID = 3 -> Employee 2 -> Sleeping for 903  
ThreadID = 3 -> Employee 3 -> Sleeping for 1241  
ThreadID = 3 -> Employee 4 -> Sleeping for 1367  
ThreadID = 3 -> Employee 5 -> Sleeping for 1007  
ThreadID = 3 -> Employee 6 -> Sleeping for 923  
ThreadID = 1 -> Employee 8 -> Sleeping for 1032  
ThreadID = 1 -> Employee 9 -> Sleeping for 1948  
ThreadID = 1 -> Employee 10 -> Sleeping for 1456  
ThreadID = 1 -> Employee 5 -> Sleeping for 1737  
Now waiting  
ThreadID = 5 -> Employee 2 finished  
ThreadID = 3 -> Employee 6 finished  
something bad happened  
All Done  

我的问题是我对这个工作流程有什么误解。我基本上想尽快取消操作而无需长时间运行(睡眠只是这种情况下的一个例子,但它可能非常昂贵)

4

1 回答 1

2

该代码存在一些问题:

1.)ToArray()实现序列,即它只会在源序列的所有输入都经过Select(...).

由于您cs.Cancel()在那之后调用它不会token.IsCancellationRequested在开始时立即触发ProcessThisEmployee

2.)WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token))看起来不错,但实际上并没有真正做你想做的事情,因为它ProcessThisEmployee是一个异步方法,一旦达到第一次返回或第一次等待就返回。

您可能想要做的是执行ProcessThisEmployee只有 2 度并行度的长时间运行的方法。你实际上做的是创建一堆Tasks只有 2 度的并行度。之后任务本身都同时运行。

我不知道如何针对您的特定情况解决此问题,因为我不知道上下文。但也许这已经对你有所帮助了。


更新以回复您的评论:我正在做 ToArray 并且 ProcessThisEmployee 是一个异步方法,因为此代码将成为库的一部分并且可以从 WPF 应用程序中使用。最终用户可能希望在 UI 上提供更新,所以我不想在操作完成之前阻止(john smith)

不要为本质上不是异步的东西编写异步包装器,即主要是文件、网络或数据库访问。如果使用库的开发人员想要在异步上下文中调用某些东西,他仍然可以执行await Task.Run(...). 有关这方面的更多信息,您可以查看这篇文章,了解您是否应该为同步方法公开异步包装器

在我看来,如果您已经有一个有效的 LINQ 查询并且想要加快它的速度,那么 PLINQ 最有用,因为该查询适合并行处理。

在您的情况下,最简单的方法可能是使用 2 个线程的工作队列。我很确定网络上有这样的例子。

于 2015-01-14T14:58:46.777 回答