有关背景信息,请参阅此问题:
该问题询问 Tasks 如何影响Trace.CorrelationManager.ActivityId。@Greg Samson 用一个测试程序回答了他自己的问题,表明 ActivityId 在任务上下文中是可靠的。测试程序在Task委托的开头设置一个ActivityId,休眠模拟工作,然后在最后检查ActivityId以确保它是相同的值(即它没有被另一个线程修改)。程序运行成功。
在研究线程、任务和并行操作的其他“上下文”选项(最终为日志记录提供更好的上下文)时,我遇到了Trace.CorrelationManager.LogicalOperationStack的一个奇怪问题(无论如何这对我来说很奇怪)。我已经在下面复制了我对他的问题的“答案”。
我认为它充分描述了我遇到的问题(在 Parallel.For 的上下文中使用时,Trace.CorrelationManager.LogicalOperationStack 显然已损坏 - 或其他东西,但前提是 Parallel.For 本身包含在逻辑操作中) .
以下是我的问题:
Trace.CorrelationManager.LogicalOperationStack 是否可以与 Parallel.For 一起使用?如果是这样,如果逻辑操作已经在 Parallel.For 启动时生效,是否会有所不同?
是否有一种“正确”的方式将 LogicalOperationStack 与 Parallel.For 一起使用?我可以对这个示例程序进行不同的编码以使其“工作”吗?通过“作品”,我的意思是 LogicalOperationStack 始终具有预期的条目数,并且条目本身就是预期的条目。
我已经使用 Threads 和 ThreadPool 线程进行了一些额外的测试,但我必须返回并重试这些测试,看看我是否遇到了类似的问题。
我会说,似乎任务/并行线程和线程池线程确实从父线程“继承”了 Trace.CorrelationManager.ActivityId 和 Trace.CorrelationManager.LogicalOperationStack 值。这是意料之中的,因为这些值是由 CorrelationManager 使用CallContext的 LogicalSetData 方法(而不是 SetData)存储的。
再次,请参考这个问题以获取我在下面发布的“答案”的原始上下文:
另请参阅 Microsoft 的 Parallel Extensions 论坛上的类似问题(迄今为止尚未回答):
[开始粘贴]
请原谅我将其发布为答案,因为它并不能真正回答您的问题,但是,它与您的问题有关,因为它涉及 CorrelationManager 行为和线程/任务/等。我一直在研究使用 CorrelationManager 的LogicalOperationStack
(和StartLogicalOperation/StopLogicalOperation
方法)在多线程场景中提供额外的上下文。
我以您的示例为例并对其进行了一些修改,以添加使用 Parallel.For 并行执行工作的能力。另外,我使用StartLogicalOperation/StopLogicalOperation
括号(内部)DoLongRunningWork
。从概念上讲,DoLongRunningWork
每次执行时都会执行以下操作:
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
我发现,如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作都保持同步(堆栈上的预期操作数和堆栈上的操作值始终为预期的)。
在我自己的一些测试中,我发现情况并非总是如此。逻辑操作堆栈正在“损坏”。我能想出的最好解释是,当“子”线程退出时,将 CallContext 信息“合并”到“父”线程上下文中导致“旧”子线程上下文信息(逻辑操作)为“由另一个同级子线程继承”。
问题也可能与 Parallel.For 显然使用主线程(至少在示例代码中,如所写)作为“工作线程”之一(或在并行域中应该调用的任何线程)这一事实有关。每当执行 DoLongRunningWork 时,都会启动一个新的逻辑操作(在开始时)并停止(在结束时)(即,推入 LogicalOperationStack 并从中弹出)。如果主线程已经有一个有效的逻辑操作,并且如果 DoLongRunningWork 在主线程上执行,则启动一个新的逻辑操作,因此主线程的 LogicalOperationStack 现在有两个操作。DoLongRunningWork 的任何后续执行(只要 DoLongRunningWork 的这种“迭代”在主线程上执行)将(显然)继承主线程'
我花了很长时间才弄清楚为什么我的示例中的 LogicalOperationStack 的行为与您的示例的修改版本不同。最后我看到,在我的代码中,我将整个程序括在一个逻辑操作中,而在我修改过的测试程序版本中,我没有。这意味着在我的测试程序中,每次执行我的“工作”(类似于 DoLongRunningWork),已经有一个有效的逻辑操作。在您的测试程序的修改版本中,我没有将整个程序括在逻辑操作中。
因此,当我修改您的测试程序以将整个程序括在逻辑操作中并且如果我使用 Parallel.For 时,我遇到了完全相同的问题。
使用上面的概念模型,这将成功运行:
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
虽然这最终会因为 LogicalOperationStack 明显不同步而断言:
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
这是我的示例程序。它与您的相似之处在于它具有操作 ActivityId 和 LogicalOperationStack 的 DoLongRunningWork 方法。我也有两种风格的 DoLongRunningWork。一种使用Tasks,一种使用Parallel.For。也可以执行每种风格,以使整个并行化操作包含在逻辑操作中或不包含在逻辑操作中。因此,总共有 4 种方式来执行并行操作。要尝试每一个,只需取消注释所需的“使用...”方法,重新编译并运行。 UseTasks
, UseTasks(true)
, 并且UseParallelFor
应该都运行完成。 UseParallelFor(true)
将在某个时候断言,因为 LogicalOperationStack 没有预期的条目数。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
LogicalOperationStack 是否可以与 Parallel.For (和/或其他线程/任务构造)一起使用或如何使用它的整个问题可能值得自己提出问题。也许我会发布一个问题。同时,我想知道您对此是否有任何想法(或者,我想知道您是否考虑过使用 LogicalOperationStack,因为 ActivityId 似乎是安全的)。
[结束粘贴]
有人对这个问题有任何想法吗?