2

我更新了一个使用 SuperSocket 连接到旧 c++ 服务器的旧项目。使用最新版本(从 0.7.0 => 0.8.0.8)我在尝试重新连接时遇到异常(它说套接字是在不同的线程上打开的)我想要一个将任务排入队列的类(第一次连接和重新连接)并在特定线程上运行它们。

我已经看到了这种方法,但是当我尝试运行创建为异常的任务时

不能为先前排队到不同 TaskScheduler 的任务调用 ExecuteTask。

这是我从上面的链接中学习的课程

public class SameThreadTaskScheduler : TaskScheduler, IDisposable
{
    #region publics
    public SameThreadTaskScheduler(string name)
    {
        scheduledTasks = new Queue<Task>();
        threadName = name;
    }
    public override int MaximumConcurrencyLevel => 1;

    public void Dispose()
    {
        lock (scheduledTasks)
        {
            quit = true;
            Monitor.PulseAll(scheduledTasks);
        }
    }
    #endregion

    #region protected overrides
    protected override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()
    {
        lock (scheduledTasks)
        {
            return scheduledTasks.ToList();
        }
    }

    protected override void QueueTask(Task task)
    {
        if (myThread == null)
            myThread = StartThread(threadName);
        if (!myThread.IsAlive)
            throw new ObjectDisposedException("My thread is not alive, so this object has been disposed!");
        lock (scheduledTasks)
        {
            scheduledTasks.Enqueue(task);
            Monitor.PulseAll(scheduledTasks);
        }
    }

    public void Queue(Task task)
    {
        QueueTask(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool task_was_previously_queued)
    {
        return false;
    }

    #endregion

    private readonly Queue<System.Threading.Tasks.Task> scheduledTasks;
    private Thread myThread;
    private readonly string threadName;
    private bool quit;

    private Thread StartThread(string name)
    {
        var t = new Thread(MyThread) { Name = name };
        using (var start = new Barrier(2))
        {
            t.Start(start);
            ReachBarrier(start);
        }
        return t;
    }

    private void MyThread(object o)
    {
        Task tsk;
        lock (scheduledTasks)
        {
            //When reaches the barrier, we know it holds the lock.
            //
            //So there is no Pulse call can trigger until
            //this thread starts to wait for signals.
            //
            //It is important not to call StartThread within a lock.
            //Otherwise, deadlock!
            ReachBarrier(o as Barrier);
            tsk = WaitAndDequeueTask();
        }
        for (;;)
        {
            if (tsk == null)
                break;
            TryExecuteTask(tsk);
            lock (scheduledTasks)
            {
                tsk = WaitAndDequeueTask();
            }
        }
    }

    private Task WaitAndDequeueTask()
    {
        while (!scheduledTasks.Any() && !quit)
            Monitor.Wait(scheduledTasks);
        return quit ? null : scheduledTasks.Dequeue();
    }

    private static void ReachBarrier(Barrier b)
    {
        if (b != null)
            b.SignalAndWait();
    }
}

这是我如何称呼任务

public void RegisterServers()
{
    sameThreadTaskScheduler.Queue(new Task(() =>
    {
        ...something
    }));

我究竟做错了什么?

4

1 回答 1

1

您必须启动一个任务才能将其绑定到TaskScheduler. 在您的代码中,您手动将任务排队,以相反的方式执行,因此该任务不会绑定到您的任务调度程序(或任何),并且TryExecuteTask会因该错误而失败。描述相当神秘,因为任何实际TaskScheduler都不同于null.

对于您在未运行的情况下创建的任务,有 和 的重载Task.RunSynchronouslyTask.Start需要TaskScheduler.

对于自动开始运行的任务,有 和 的重载TaskFactory.StartNewTaskFactory<TResult>.StartNew需要TaskScheduler.

对于后续任务,有 , 和 的重载Task.ContinueWith,并且需要.Task<TResult>.ContinueWithTaskFactory.ContinueWhenAllTaskFactory.ContinueWhenAnyTaskScheduler

不带任务调度器的重载等价于指定TaskScheduler.Current. 在 的情况下TaskFactory,默认情况下Task.Factory或在调用工厂方法时创建的没有任务调度程序的情况下都是如此,否则使用工厂的任务调度程序。


Task.Run总是使用TaskScheduler.Default. 根据大多数有经验的人的说法,线程池调度程序更常被期望作为默认值TaskScheduler.Current,而不是线程绑定的,但是更改现有 API 的合同为时已晚。

于 2017-01-25T17:19:08.153 回答