46

我有一些Task<T>可以随意返回的方法await。我想让这些任务在自定义TaskScheduler而不是默认任务上执行。

var task = GetTaskAsync ();
await task;

我知道我可以创建一个新TaskFactory (new CustomScheduler ())的并从中做一个StartNew (),但StartNew ()采取行动并创建一个Task,我已经有了Task(由 a 返回幕后TaskCompletionSource

我怎样才能指定我自己TaskSchedulerawait

4

6 回答 6

51

我认为您真正想要的是做一个Task.Run, 但使用自定义调度程序。StartNew不能直观地使用异步方法;Stephen Toub 有一篇关于 和 之间差异的精彩博客文章Task.RunTaskFactory.StartNew

因此,要创建自己的 custom Run,您可以执行以下操作:

private static readonly TaskFactory myTaskFactory = new TaskFactory(
    CancellationToken.None, TaskCreationOptions.DenyChildAttach,
    TaskContinuationOptions.None, new MyTaskScheduler());
private static Task RunOnMyScheduler(Func<Task> func)
{
  return myTaskFactory.StartNew(func).Unwrap();
}
private static Task<T> RunOnMyScheduler<T>(Func<Task<T>> func)
{
  return myTaskFactory.StartNew(func).Unwrap();
}
private static Task RunOnMyScheduler(Action func)
{
  return myTaskFactory.StartNew(func);
}
private static Task<T> RunOnMyScheduler<T>(Func<T> func)
{
  return myTaskFactory.StartNew(func);
}

然后,您可以在自定义调度程序上执行同步异步方法。

于 2013-03-15T12:08:33.263 回答
12

TaskCompletionSource<T>.Task没有任何操作的情况下构造它,并且在第一次调用时分配调度程序ContinueWith(...)(来自Asynchronous Programming with the Reactive Framework 和 Task Parallel Library - 第 3 部分)。

值得庆幸的是,您可以通过实现自己的类来稍微自定义等待行为INotifyCompletion,然后以类似于await SomeTask.ConfigureAwait(false)配置任务应该在OnCompleted(Action continuation)方法中开始使用的调度程序的模式使用它(从等待任何东西;)。

这是用法:

    TaskCompletionSource<object> source = new TaskCompletionSource<object>();

    public async Task Foo() {
        // Force await to schedule the task on the supplied scheduler
        await SomeAsyncTask().ConfigureScheduler(scheduler);
    }

    public Task SomeAsyncTask() { return source.Task; }

这是一个ConfigureScheduler使用任务扩展方法的简单实现,其中重要部分是OnCompleted

public static class TaskExtension {
    public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
        return new CustomTaskAwaitable(task, scheduler);
    }
}

public struct CustomTaskAwaitable {
    CustomTaskAwaiter awaitable;

    public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
        awaitable = new CustomTaskAwaiter(task, scheduler);
    }

    public CustomTaskAwaiter GetAwaiter() { return awaitable; }

    public struct CustomTaskAwaiter : INotifyCompletion {
        Task task;
        TaskScheduler scheduler;

        public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
            this.task = task;
            this.scheduler = scheduler;
        }

        public void OnCompleted(Action continuation) {
            // ContinueWith sets the scheduler to use for the continuation action
            task.ContinueWith(x => continuation(), scheduler);
        }

        public bool IsCompleted { get { return task.IsCompleted; } }
        public void GetResult() { }
    }
}

这是一个将编译为控制台应用程序的工作示例:

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace Example {
    class Program {
        static TaskCompletionSource<object> source = new TaskCompletionSource<object>();
        static TaskScheduler scheduler = new CustomTaskScheduler();

        static void Main(string[] args) {
            Console.WriteLine("Main Started");
            var task = Foo();
            Console.WriteLine("Main Continue ");
            // Continue Foo() using CustomTaskScheduler
            source.SetResult(null);
            Console.WriteLine("Main Finished");
        }

        public static async Task Foo() {
            Console.WriteLine("Foo Started");
            // Force await to schedule the task on the supplied scheduler
            await SomeAsyncTask().ConfigureScheduler(scheduler);
            Console.WriteLine("Foo Finished");
        }

        public static Task SomeAsyncTask() { return source.Task; }
    }

    public struct CustomTaskAwaitable {
        CustomTaskAwaiter awaitable;

        public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
            awaitable = new CustomTaskAwaiter(task, scheduler);
        }

        public CustomTaskAwaiter GetAwaiter() { return awaitable; }

        public struct CustomTaskAwaiter : INotifyCompletion {
            Task task;
            TaskScheduler scheduler;

            public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
                this.task = task;
                this.scheduler = scheduler;
            }

            public void OnCompleted(Action continuation) {
                // ContinueWith sets the scheduler to use for the continuation action
                task.ContinueWith(x => continuation(), scheduler);
            }

            public bool IsCompleted { get { return task.IsCompleted; } }
            public void GetResult() { }
        }
    }

    public static class TaskExtension {
        public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
            return new CustomTaskAwaitable(task, scheduler);
        }
    }

    public class CustomTaskScheduler : TaskScheduler {
        protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; }
        protected override void QueueTask(Task task) {
            TryExecuteTask(task);
        }
    }
}
于 2013-06-21T20:19:29.863 回答
8

无法将丰富的异步功能嵌入到自定义TaskScheduler. 这个类的设计没有考虑到async/ await。使用自定义的标准方法TaskScheduler是作为Task.Factory.StartNew方法的参数。此方法不理解异步委托。可以提供异步委托,但它被视为返回某些结果的任何其他委托。要获得异步委托的实际等待结果,必须调用Unwrap()返回的任务。

不过,这不是问题。问题是TaskScheduler基础架构没有将异步委托视为单个工作单元。每个任务被分成多个小任务(使用everyawait作为分隔符),每个小任务单独处理。这严重限制了可以在此类之上实现的异步功能。例如,这里有一个自定义TaskScheduler,旨在一次将提供的任务排队(换句话说,限制并发):

public class MyTaskScheduler : TaskScheduler
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

    protected async override void QueueTask(Task task)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() => base.TryExecuteTask(task));
            await task;
        }
        finally
        {
            _semaphore.Release();
        }
    }

    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued) => false;

    protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
}

SemaphoreSlim应该确保一次只Task运行一个。不幸的是,它不起作用。信号量过早释放,因为Task调用中传入的QueueTask(task)不是代表异步委托的全部工作的任务,而只是直到第一个await. 其他部分传递给该TryExecuteTaskInline方法。没有办法关联这些任务部分,因为没有提供标识符或其他机制。以下是实践中发生的情况:

var taskScheduler = new MyTaskScheduler();
var tasks = Enumerable.Range(1, 5).Select(n => Task.Factory.StartNew(async () =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Started");
    await Task.Delay(1000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Finished");
}, default, TaskCreationOptions.None, taskScheduler))
.Select(t => t.Unwrap())
.ToArray();
Task.WaitAll(tasks);

输出:

05:29:58.346 项目 1 开始
05:29:58.358 项目 2 开始
05:29:58.358 项目 3 开始
05:29:58.358 项目 4 开始
05:29:58.358 项目 5 开始
05:29:59.358 项目 1 完成
05: 29:59.374 第 5 项已完成
05:29:59.374 第 4 项已完成
05:29:59.374 第 2 项已完成
05:29:59.374 第 3 项已完成

灾难,所有任务一次排队。

结论:当需要高级异步功能时,自定义TaskScheduler类不是要走的路。

更新:这是另一个观察,关于TaskScheduler在环境存在的custom SynchronizationContext。默认情况下,该await机制捕获 currentSynchronizationContext或 current TaskScheduler,并在捕获的上下文或调度程序上调用延续。如果两者都存在,SynchronizationContext则首选当前,而TaskScheduler忽略当前。下面是此行为的演示,在 WinForms 应用程序中¹:

private async void Button1_Click(object sender, EventArgs e)
{
    await Task.Factory.StartNew(async () =>
    {
        MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
        await Task.Delay(1000);
        MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
    }, default, TaskCreationOptions.None,
        TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
}

单击该按钮会依次弹出两条消息,其中包含以下信息:

1、System.Threading.Tasks.SynchronizationContextTaskScheduler

1、System.Threading.Tasks.ThreadPoolTask​​Scheduler

这个实验表明,只有异步委托的第一部分,即第一部分之前的部分await,被安排在非默认调度程序上。这种行为甚至进一步限制了 customTaskScheduler在启用 async/await 的环境中的实际用途。

¹ Windows Forms 应用程序在调用该方法WindowsFormsSynchronizationContext时会自动安装。Application.Run

于 2019-08-29T03:18:03.583 回答
4

在评论之后,您似乎想要控制在等待之后运行代码的调度程序。

编译从默认情况下在当前 SynchronizationContext 上运行的 await 创建一个延续。所以你最好的办法是SynchronizationContext在调用之前设置等待。

有一些方法可以等待特定的上下文。请参阅Jon Skeet 的Configure Await,尤其是有关 SwitchTo 的部分,以获取有关如何实现此类功能的更多信息。

编辑:TaskEx 中的 SwitchTo 方法已被删除,因为它太容易误用。有关原因,请参阅MSDN 论坛。

于 2013-03-15T10:14:35.833 回答
3

你能适应这个方法调用吗:

  await Task.Factory.StartNew(
        () => { /* to do what you need */ }, 
        CancellationToken.None, /* you can change as you need */
        TaskCreationOptions.None, /* you can change as you need */
        customScheduler);
于 2014-04-30T09:52:56.287 回答
-1

面对同样的问题,尝试使用LimitedConcurrencyLevelTask​​Scheduler,但它不支持异步任务。所以...

刚刚编写了我自己的小型简单调度程序,它允许基于全局 ThreadPool(和 Task.Run 方法)运行异步任务,并能够限制当前的最大并行度。这对我的确切目的来说已经足够了,也许对你们也有帮助,伙计们。

主要演示代码(控制台应用程序,dotnet core 3.1):

    static async Task Main(string[] args)
    {

        //5 tasks to run per time
        int concurrentLimit = 5;
        var scheduler = new ThreadPoolConcurrentScheduler(concurrentLimit);

        //catch all errors in separate event handler
        scheduler.OnError += Scheduler_OnError;

        // just monitor "live" state and output to console
        RunTaskStateMonitor(scheduler);

        // simulate adding new tasks "on the fly"
        SimulateAddingTasksInParallel(scheduler);

        Console.WriteLine("start adding 50 tasks");

        //add 50 tasks
        for (var i = 1; i <= 50; i++)
        {
            scheduler.StartNew(myAsyncTask);
        }

        Console.WriteLine("50 tasks added to scheduler");

        Thread.Sleep(1000000);


    }

支持代码(放在同一个地方):

    private static void Scheduler_OnError(Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }

    private static int currentTaskFinished = 0;

    //your sample of async task
    static async Task myAsyncTask()
    {
        Console.WriteLine("task started ");

        using (HttpClient httpClient = new HttpClient())
        {
            //just make http request to ... wikipedia!
            //sorry, Jimmy Wales! assume,guys, you will not DDOS wiki :)
            var uri = new Uri("https://wikipedia.org/");
            var response = await httpClient.GetAsync(uri);
            string result = await response.Content.ReadAsStringAsync();
            if (string.IsNullOrEmpty(result))
                Console.WriteLine("error, await is not working");
            else
                Console.WriteLine($"task result : site length is {result.Length}");
        }
        //or simulate it using by sync sleep
        //Thread.Sleep(1000);
        //and for tesing exception : 
        //throw new Exception("my custom error");
        Console.WriteLine("task finished ");

        //just incrementing total ran tasks to output in console
        Interlocked.Increment(ref currentTaskFinished);
    }

    static void SimulateAddingTasksInParallel(ThreadPoolConcurrentScheduler taskScheduler)
    {
        int runCount = 0;
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                runCount++;

                if (runCount > 5)
                    break;

                //every 10 sec 5 times
                Thread.Sleep(10000);

                //adding new 5 tasks from outer task
                Console.WriteLine("start adding new 5 tasks!");
                for (var i = 1; i <= 5; i++)
                {
                    taskScheduler.StartNew(myAsyncTask);
                }

                Console.WriteLine("new 5 tasks added!");
            }
        }, TaskCreationOptions.LongRunning);
    }

    static void RunTaskStateMonitor(ThreadPoolConcurrentScheduler taskScheduler)
    {
        int prev = -1;
        int prevQueueSize = -1;
        int prevFinished = -1;
        Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    // getting current thread count in working state
                    var currCount = taskScheduler.GetCurrentWorkingThreadCount();
                    // getting inner queue state
                    var queueSize = taskScheduler.GetQueueTaskCount();

                    //just output overall state if something changed
                    if (prev != currCount || queueSize != prevQueueSize || prevFinished != currentTaskFinished)
                    {
                        Console.WriteLine($"Monitor : running tasks:{currCount}, queueLength:{queueSize}. total Finished tasks : " + currentTaskFinished);
                        prev = currCount;
                        prevQueueSize = queueSize;
                        prevFinished = currentTaskFinished;
                    }

                    // check it every 10 ms
                    Thread.Sleep(10);
                }
            }
            , TaskCreationOptions.LongRunning);
    }

调度器:

public class ThreadPoolConcurrentScheduler
{
    private readonly int _limitParallelThreadsCount;
    private int _threadInProgressCount = 0;

    public delegate void onErrorDelegate(Exception ex);
    public event onErrorDelegate OnError;

    private ConcurrentQueue<Func<Task>> _taskQueue;
    private readonly object _queueLocker = new object();


    public ThreadPoolConcurrentScheduler(int limitParallelThreadsCount)
    {
        //set maximum parallel tasks to run
        _limitParallelThreadsCount = limitParallelThreadsCount;
        // thread-safe queue to store tasks
        _taskQueue = new ConcurrentQueue<Func<Task>>();
    }

    //main method to start async task
    public void StartNew(Func<Task> task)
    {
        lock (_queueLocker)
        {
            // checking limit
            if (_threadInProgressCount >= _limitParallelThreadsCount)
            {
                //waiting new "free" threads in queue
                _scheduleTask(task);
            }
            else
            {
                _startNewTask(task);
            }
        }
    }

    private void _startNewTask(Func<Task> task)
    {
        Interlocked.Increment(ref _threadInProgressCount);
        Task.Run(async () =>
        {
            try
            {
                await task();
            }
            catch (Exception e)
            {
                //Console.WriteLine(e);
                OnError?.Invoke(e);
            }
        }).ContinueWith(_onTaskEnded);
    }

    //will be called on task end
    private void _onTaskEnded(Task task)
    {
        lock (_queueLocker)
        {
            Interlocked.Decrement(ref _threadInProgressCount);
            //queue has more priority, so if thread is free - let's check queue first
            if (!_taskQueue.IsEmpty)
            {
                if (_taskQueue.TryDequeue(out var result))
                {
                    _startNewTask(result);
                }
            }
        }
    }

    private void _scheduleTask(Func<Task> task)
    {
        _taskQueue.Enqueue(task);
    }

    //returning in progress task count 
    public int GetCurrentWorkingThreadCount()
    {
        return _threadInProgressCount;
    }

    //return number of tasks waiting to run
    public int GetQueueTaskCount()
    {
        lock (_queueLocker) return _taskQueue.Count;
    }
}

几点注意事项:

  1. 首先 - 检查对它的评论,也许这是有史以来最糟糕的代码!
  2. 没有在产品中测试
  3. 没有实现取消令牌和任何其他功能,应该在那里,但我太懒了。对不起
于 2020-12-27T23:48:27.683 回答