我将从对我如何理解几件事情的基本解释开始这一点,然后用一个 tldr 来结束这一切;如果人们只是想解决我在这里遇到的实际问题。如果我对这里的任何理解有误,请纠正我。
TPL 代表任务并行库,它是 .NET 4.0 试图进一步简化线程以方便开发人员使用的答案。如果您不熟悉它,(在非常基础的级别上)您启动一个新的 Task 对象并向其传递一个委托,然后该委托在从线程池获取的后台线程上运行(通过使用线程池而不是真正制作通过使用这些现有线程而不是创建和处置新线程来节省新线程、时间和资源)。
据我了解,C# 中的 Parallel.ForEach 命令将为它应该执行的每个委托生成一个新线程(可能来自线程池),但可能的例外是自动执行一个甚至可能更多的内联如果编译器认为它们将足够快地发生以提高效率,则迭代。
与我的目标最相关的背景信息:
我正在尝试制作一个启动任务的快速程序,以便与程序的其余部分同时运行。在此任务中,Parallel.ForEach 运行 3 次“迭代”。总的来说,我们希望程序现在总共运行 5 个线程(最多):1 个用于主线程,1 个用于实际任务,最多 3 个用于 Parallel.ForEach。每个 Thread 都有自己的目标要完成(尽管 Parallel.ForEach 都有相同的目标,但其相关 itemNumber 的值不同以计算。当主线程完成所有目标时,它使用 Task.Wait() 等待在完成任务上,它也等待 Parallel.ForEach 完成。然后使用并验证这些值。
tldr; 实际问题:
运行上述想法时,Parallel.ForEach 似乎正在初始化两倍于我预期的 SynchronizationContexts(本质上是另一个线程的 TPL 对象)并运行所有它们,但只等待预期数量的它们。因为 Parallel.ForEach().Wait() 命令以预期的线程运行数量完成,所以 Task 也会在它认为一切都完成时完成。然后主程序发现任务已经完成,并且当它验证当前没有更多后台线程在运行时,有时剩余的 Parrallel.ForEach() 尚未完成,因此会引发错误。
通过在每个 SynchronizationContext 的 post 调用(Async 方法启动器)时打印到调试窗口来验证线程的数量与我所说的匹配。每个线程还被一个主线程对象引用,否则该对象计划在完成任务时被处置,但由于由于未真正预期创建的未完成线程仍然有引用,因此处置不能正确发生。
Thread testThread = Thread.CurrentThread;
Task backgroundTask = taskFactory.StartNew(() =>
{
Thread rootTaskThread = Thread.CurrentThread;
Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline");
Thread.Sleep(TimeSpan.FromSeconds(2));
Parallel.ForEach(new[] { 1, 2, 3, 4 },
new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => {
Thread.Sleep(TimeSpan.FromSeconds(1));
});
});
在上面的示例中,主线程、backgroundTask 任务和 8 个 Parallel.ForEach 线程最终都存在,其中最后 9 个线程是在 SynchronizationContexts 上创建的。
我的自定义在 SynchronizationContext 中覆盖的唯一方法是 post,如下所示:
public override void Post(SendOrPostCallback d, object state){
Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null;
Request.IAsyncContextData requestData = null;
if (requestOrNull != null){
requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount();
}
Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
base.Post((object internalState) => {
// Capture the spawned thread state and restore the originating thread state
try{
if (requestData != null){
Request.AttachToAsynchronousContext(requestData);
}
d(state);
}
finally{
// Restore original spawned thread state
if (requestData != null){
// Disposes the request if this is the last reference to it
Request.DetachFromAsynchronousContext(requestData);
}
Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
}
}, state);
}
我认为 TaskScheduler 只做它所需的基本工作:
private readonly RequestSynchronizationContext context;
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>();
public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext)
{
this.context = synchronizationContext;
}
protected override void QueueTask(Task task){
this.tasks.Enqueue(task);
this.context.Post((object state) => {
Task nextTask;
if (this.tasks.TryDequeue(out nextTask))
this.TryExecuteTask(nextTask);
}, null);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){
if (SynchronizationContext.Current == this.context)
return this.TryExecuteTask(task);
else
return false;
}
protected override IEnumerable<Task> GetScheduledTasks(){
return this.tasks.ToArray();
}
任务工厂:
public RequestTaskFactory(RequestTaskScheduler taskScheduler)
: base(taskScheduler)
{ }
关于为什么会发生这种情况的任何想法?