1

我将从对我如何理解几件事情的基本解释开始这一点,然后用一个 tldr 来结束这一切;如果人们只是想解决我在这里遇到的实际问题。如果我对这里的任何理解有误,请纠正我。

TPL 代表任务并行库,它是 .NET 4.0 试图进一步简化线程以方便开发人员使用的答案。如果您不熟悉它,(在非常基础的级别上)您启动一个新的 Task 对象并向其传递一个委托,然后该委托在从线程池获取的后台线程上运行(通过使用线程池而不是真正制作通过使用这些现有线程而不是创建和处置新线程来节省新线程、时间和资源)。

据我了解,C# 中的 Parallel.ForEach 命令将为它应该执行的每个委托生成一个新线程(可能来自线程池),但可能的例外是自动执行一个甚至可能更多的内联如果编译器认为它们将足够快地发生以提高效率,则迭代。

与我的目标最相关的背景信息:

我正在尝试制作一个启动任务的快速程序,以便与程序的其余部分同时运行。在此任务中,Parallel.ForEach 运行 3 次“迭代”。总的来说,我们希望程序现在总共运行 5 个线程(最多):1 个用于主线程,1 个用于实际任务,最多 3 个用于 Parallel.ForEach。每个 Thread 都有自己的目标要完成(尽管 Parallel.ForEach 都有相同的目标,但其相关 itemNumber 的值不同以计算。当主线程完成所有目标时,它使用 Task.Wait() 等待在完成任务上,它也等待 Parallel.ForEach 完成。然后使用并验证这些值。

tldr; 实际问题:

运行上述想法时,Parallel.ForEach 似乎正在初始化两倍于我预期的 SynchronizationContexts(本质上是另一个线程的 TPL 对象)并运行所有它们,但只等待预期数量的它们。因为 Parallel.ForEach().Wait() 命令以预期的线程运行数量完成,所以 Task 也会在它认为一切都完成时完成。然后主程序发现任务已经完成,并且当它验证当前没有更多后台线程在运行时,有时剩余的 Parrallel.ForEach() 尚未完成,因此会引发错误。

通过在每个 SynchronizationContext 的 post 调用(Async 方法启动器)时打印到调试窗口来验证线程的数量与我所说的匹配。每个线程还被一个主线程对象引用,否则该对象计划在完成任务时被处置,但由于由于未真正预期创建的未完成线程仍然有引用,因此处置不能正确发生。

Thread testThread = Thread.CurrentThread;
Task backgroundTask = taskFactory.StartNew(() =>
{
    Thread rootTaskThread = Thread.CurrentThread;
    Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline");
    Thread.Sleep(TimeSpan.FromSeconds(2));

    Parallel.ForEach(new[] { 1, 2, 3, 4 },
       new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => {
        Thread.Sleep(TimeSpan.FromSeconds(1));
     });
});

在上面的示例中,主线程、backgroundTask 任务和 8 个 Parallel.ForEach 线程最终都存在,其中最后 9 个线程是在 SynchronizationContexts 上创建的。

我的自定义在 SynchronizationContext 中覆盖的唯一方法是 post,如下所示:

public override void Post(SendOrPostCallback d, object state){
    Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null;
    Request.IAsyncContextData requestData = null;

    if (requestOrNull != null){
       requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount();
    }

    Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));

    base.Post((object internalState) => {
        // Capture the spawned thread state and restore the originating thread state
        try{
            if (requestData != null){
                Request.AttachToAsynchronousContext(requestData);
            }
            d(state);
        }
        finally{
            // Restore original spawned thread state
            if (requestData != null){
            // Disposes the request if this is the last reference to it
                Request.DetachFromAsynchronousContext(requestData);
            }
        Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
        }
    }, state);
 }

我认为 TaskScheduler 只做它所需的基本工作:

private readonly RequestSynchronizationContext context;
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>();

public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext)
{
    this.context = synchronizationContext;
}

protected override void QueueTask(Task task){
    this.tasks.Enqueue(task);
    this.context.Post((object state) => {
        Task nextTask;
        if (this.tasks.TryDequeue(out nextTask)) 
            this.TryExecuteTask(nextTask);
    }, null);
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){
    if (SynchronizationContext.Current == this.context)
        return this.TryExecuteTask(task);
    else
        return false;
}

protected override IEnumerable<Task> GetScheduledTasks(){
    return this.tasks.ToArray();
}

任务工厂:

public RequestTaskFactory(RequestTaskScheduler taskScheduler)
    : base(taskScheduler)
{ }

关于为什么会发生这种情况的任何想法?

4

1 回答 1

0

任务本身不会创建线程。如果可以的话,由 TaskScheduler 决定做什么来使操作异步。例如,某些操作使用异步 IO,在这种情况下,硬件使其异步,而不是另一个工作线程。

如何调用延续取决于同步上下文。该上下文不是另一个线程,它只是抽象了可以运行操作的标准。例如,在 WPF、WinForms、Silverlight 等中,有一个 UI 同步上下文,其动作需要在特定线程(UI 线程或主线程,以避免异常)上执行。

ForEach 将尝试创建线程(更具体地说,它将尝试询问同步上下文以启动多个异步操作)。调度程序确实定义了它是如何做到的。如果你给它三个任务,它可能会创建三个线程,也可能不会。它决定三个并发线程是否是一件好事。例如,如果您只有两个内核,则 ForEach 不会创建两个以上的线程,因为由于上下文切换开销,这可能比使用单个线程并按顺序运行代码更糟糕。

不清楚“初始化两倍的 SynchronizationContext”是什么意思。这些不是线程。你只是说它创建的线程比你预期的多吗?还是您的意思是 Post 的调用次数超出了您的预期?您的 SynchronizationContext 类基于什么?(即它的基类是什么)。基础所做的主要定义了 Post 调用的可能性。它可能觉得需要创建另一个异步操作来跟踪其他操作......你如何让调度程序使用这个上下文?

SynchronizationContext 早在 TPL 之前就已经存在(首先出现在 .NET 2.0 中)。它所做的一件事是管理异步操作请求。从您的帖子中不清楚您是否理解这一点。

更新: 对 QueueTask 的第一次调用间接来自 StartNew。对 QueueTask 的第二次调用间接来自对 ForEach 的调用 第三次对 QueueTask 的调用间接来自 QueueTask 中的 TryExecuteTask 对 QueueTask 的接下来的 4 次调用是针对传递给 ForEach 的主体。

根据负载,QueueTask 最多可能被调用 3 次。如果我在 QueueTask 进行调试和中断,QueueTask 只会被调用 7 次。

此时,由于您在 QueueTask 中执行的操作与 TPL 不同(即 TryExecuteTask 正在调用一个额外的操作),因此很难说为什么有时会有一些额外的 QueueTask 调用。这可能来自您实现 QueueTask 的方式,因为您有效地要求调度程序从已经异步执行的任务中排队另一个异步任务。我的猜测是,这只是时机。QueueTask 可以被如此快速地调用(因为这是通过另一个异步操作完成的),以至于 TryExecuteTask 不知道任务已经排队并强制执行任务(强制另一个调用 QueueTask)。

如果 QueueTask 实际上导致另一个对 QueueTask 的调用,因为它还没有安排它所在的任务,这就解释了为什么最多有 10 次对 QueueTask 的调用。正是 TryExeucteTask 调用导致每个 ForEach 主体的“双重”调用......

于 2012-08-08T20:29:23.560 回答