11

有关背景信息,请参阅此问题:

任务并行库中的任务如何影响 ActivityID?

该问题询问 Tasks 如何影响Trace.CorrelationManager.ActivityId。@Greg Samson 用一个测试程序回答了他自己的问题,表明 ActivityId 在任务上下文中是可靠的。测试程序在Task委托的开头设置一个ActivityId,休眠模拟工作,然后在最后检查ActivityId以确保它是相同的值(即它没有被另一个线程修改)。程序运行成功。

在研究线程、任务和并行操作的其他“上下文”选项(最终为日志记录提供更好的上下文)时,我遇到了Trace.CorrelationManager.LogicalOperationStack的一个奇怪问题(无论如何这对我来说很奇怪)。我已经在下面复制了我对他的问题的“答案”。

我认为它充分描述了我遇到的问题(在 Parallel.For 的上下文中使用时,Trace.CorrelationManager.LogicalOperationStack 显然已损坏 - 或其他东西,但前提是 Parallel.For 本身包含在逻辑操作中) .

以下是我的问题:

  1. Trace.CorrelationManager.LogicalOperationStack 是否可以与 Parallel.For 一起使用?如果是这样,如果逻辑操作已经在 Parallel.For 启动时生效,是否会有所不同?

  2. 是否有一种“正确”的方式将 LogicalOperationStack 与 Parallel.For 一起使用?我可以对这个示例程序进行不同的编码以使其“工作”吗?通过“作品”,我的意思是 LogicalOperationStack 始终具有预期的条目数,并且条目本身就是预期的条目。

我已经使用 Threads 和 ThreadPool 线程进行了一些额外的测试,但我必须返回并重试这些测试,看看我是否遇到了类似的问题。

我会说,似乎任务/并行线程和线程池线程确实从父线程“继承”了 Trace.CorrelationManager.ActivityId 和 Trace.CorrelationManager.LogicalOperationStack 值。这是意料之中的,因为这些值是由 CorrelationManager 使用CallContext的 LogicalSetData 方法(而不是 SetData)存储的。

再次,请参考这个问题以获取我在下面发布的“答案”的原始上下文:

任务并行库中的任务如何影响 ActivityID?

另请参阅 Microsoft 的 Parallel Extensions 论坛上的类似问题(迄今为止尚未回答):

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[开始粘贴]

请原谅我将其发布为答案,因为它并不能真正回答您的问题,但是,它与您的问题有关,因为它涉及 CorrelationManager 行为和线程/任务/等。我一直在研究使用 CorrelationManager 的LogicalOperationStack(和StartLogicalOperation/StopLogicalOperation方法)在多线程场景中提供额外的上下文。

我以您的示例为例并对其进行了一些修改,以添加使用 Parallel.For 并行执行工作的能力。另外,我使用StartLogicalOperation/StopLogicalOperation括号(内部)DoLongRunningWork。从概念上讲,DoLongRunningWork每次执行时都会执行以下操作:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

我发现,如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作都保持同步(堆栈上的预期操作数和堆栈上的操作值始终为预期的)。

在我自己的一些测试中,我发现情况并非总是如此。逻辑操作堆栈正在“损坏”。我能想出的最好解释是,当“子”线程退出时,将 CallContext 信息“合并”到“父”线程上下文中导致“旧”子线程上下文信息(逻辑操作)为“由另一个同级子线程继承”。

问题也可能与 Parallel.For 显然使用主线程(至少在示例代码中,如所写)作为“工作线程”之一(或在并行域中应该调用的任何线程)这一事实有关。每当执行 DoLongRunningWork 时,都会启动一个新的逻辑操作(在开始时)并停止(在结束时)(即,推入 LogicalOperationStack 并从中弹出)。如果主线程已经有一个有效的逻辑操作,并且如果 DoLongRunningWork 在主线程上执行,则启动一个新的逻辑操作,因此主线程的 LogicalOperationStack 现在有两个操作。DoLongRunningWork 的任何后续执行(只要 DoLongRunningWork 的这种“迭代”在主线程上执行)将(显然)继承主线程'

我花了很长时间才弄清楚为什么我的示例中的 LogicalOperationStack 的行为与您的示例的修改版本不同。最后我看到,在我的代码中,我将整个程序括在一个逻辑操作中,而在我修改过的测试程序版本中,我没有。这意味着在我的测试程序中,每次执行我的“工作”(类似于 DoLongRunningWork),已经有一个有效的逻辑操作。在您的测试程序的修改版本中,我没有将整个程序括在逻辑操作中。

因此,当我修改您的测试程序以将整个程序括在逻辑操作中并且如果我使用 Parallel.For 时,我遇到了完全相同的问题。

使用上面的概念模型,这将成功运行:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

虽然这最终会因为 LogicalOperationStack 明显不同步而断言:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

这是我的示例程序。它与您的相似之处在于它具有操作 ActivityId 和 LogicalOperationStack 的 DoLongRunningWork 方法。我也有两种风格的 DoLongRunningWork。一种使用Tasks,一种使用Parallel.For。也可以执行每种风格,以使整个并行化操作包含在逻辑操作中或不包含在逻辑操作中。因此,总共有 4 种方式来执行并行操作。要尝试每一个,只需取消注释所需的“使用...”方法,重新编译并运行。 UseTasks, UseTasks(true), 并且UseParallelFor应该都运行完成。 UseParallelFor(true)将在某个时候断言,因为 LogicalOperationStack 没有预期的条目数。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

LogicalOperationStack 是否可以与 Parallel.For (和/或其他线程/任务构造)一起使用或如何使用它的整个问题可能值得自己提出问题。也许我会发布一个问题。同时,我想知道您对此是否有任何想法(或者,我想知道您是否考虑过使用 LogicalOperationStack,因为 ActivityId 似乎是安全的)。

[结束粘贴]

有人对这个问题有任何想法吗?

4

2 回答 2

6

[开始更新]

我还在微软的 Parallel Extensions for .Net 支持论坛上问了这个问题,最终得到了Stephen Toub 的回答。事实证明,LogicalCallContext中存在导致 LogicalOperationStack 损坏的错误。还有一个很好的描述(在斯蒂芬对我对他的回答所做的回复的跟进中),它简要概述了 Parallel.For 在分配任务方面的工作原理以及为什么这会使 Parallel.For 容易受到该错误的影响。

在下面的回答中,我推测 LogicalOperationStack 与 Parallel.For 不兼容,因为 Parallel.For 使用主线程作为“工作”线程之一。根据斯蒂芬的解释,我的推测是不正确的。Parallel.For 确实使用主线程作为“工作”线程之一,但它不是简单地“按原样”使用。第一个任务在主线程上运行,但运行方式就像在新线程上运行一样。阅读斯蒂芬的描述以获取更多信息。

[结束更新]

据我所知,答案如下:

ActivityId 和 LogicalOperationStack 都通过CallContext.LogicalSetData存储。这意味着这些值将“流向”任何“子”线程。这非常酷,例如,您可以在多线程服务器的入口点设置 ActivityId(例如服务调用),并且最终从该入口点启动的所有线程都可以成为同一“活动”的一部分。同样,逻辑操作(通过LogicalOperationStack)也流向子线程。

关于 Trace.CorrelationManager.ActivityId:

ActivityId 似乎与我测试过的所有线程模型兼容:直接使用线程、使用 ThreadPool、使用任务、使用 Parallel.*。在所有情况下,ActivityId 都具有预期值。

关于 Trace.CorrelationManager.LogicalOperationStack:

LogicalOperationStack 似乎与大多数线程模型兼容,但与 Parallel.* 不兼容。直接使用线程、线程池和任务,LogicalOperationStack(在我的问题中提供的示例代码中进行操作)保持其完整性。LogicalOperationStack 的内容始终如预期的那样。

LogicalOperationStack 与 Parallel.For 不兼容。如果一个逻辑操作是“有效的”,也就是说,如果您在启动 Parallel.* 操作之前调用了 CorrelationManager.StartLogicalOperation,然后您在 Paralle.* 的上下文中启动一个新的逻辑操作(即在委托中) ,然后LogicalOperationStack 将被损坏。(我应该说它可能会损坏。Parallel.* 可能不会创建任何额外的线程,这意味着 LogicalOperationStack 将是安全的)。

问题源于 Parallel.* 使用主线程(或更准确地说,可能是启动并行操作的线程)作为其“工作”线程之一。这意味着随着“逻辑操作”在与“主”线程相同的“工作”线程中启动和停止,“主”线程的 LogicalOperationStack 正在被修改。即使调用代码(即委托)正确地维护堆栈(确保每个 StartLogicalOperation 都通过相应的 StopLogicalOperation “停止”),“主”线程堆栈也会被修改。最终似乎(对我来说,无论如何),“主”线程的 LogicalOperationStack 本质上是由两个不同的“逻辑”线程修改的:“主”线程

我不知道为什么这不起作用的深层细节(至少正如我所期望的那样)。我最好的猜测是,每次委托在线程(与主线程不同)上执行时,线程“继承”主线程的 LogicalOperationStack 的当前状态。如果委托当前正在主线程上执行(作为工作线程重用),并且已经启动了一个逻辑操作,那么其他并行化委托中的一个(或多个)将“继承”主线程的 LogicalOperationStack,它现在具有一个(或多个)新的逻辑操作生效!

FWIW,我实现了(主要是为了测试,我目前并没有真正使用它),下面的“逻辑堆栈”来模仿 LogicalOperationStack,但这样做的方式是它可以与 Parallel 一起使用。*随意尝试拿出来和/或使用它。要进行测试,请将调用替换为

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation

在我原来的问题的示例代码中调用

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}

这是受 Brent VanderMeide 在这里描述的范围对象的启发:http ://www.dnrtv.com/default.aspx?showNum=114

你可以像这样使用这个类:

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}
于 2011-02-07T19:56:00.350 回答
2

我正在研究一种在大量使用 TPL 的应用程序中可以轻松工作的逻辑堆栈的方法。我决定使用LogicalOperationStack,因为它在不更改现有代码的情况下完成了我需要的所有工作。但后来我读到了LogicalCallContext中的一个错误:

https://connect.microsoft.com/VisualStudio/feedback/details/609929/logicalcallcontext-clone-bug-when-correlationmanager-slot-is-present

所以我试图找到一个解决这个错误的方法,我认为我让它适用于 TPL(谢谢 ILSpy):

public static class FixLogicalOperationStackBug
{
    private static bool _fixed = false;

    public static void Fix()
    {
        if (!_fixed)
        {
            _fixed = true;

            Type taskType = typeof(Task);
            var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback");
            ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null);

            ContextCallback injectedCallback = new ContextCallback(obj =>
            {
                // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned
                CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack);
                s_ecCallback(obj);
            });

            s_ecCallbackField.SetValue(null, injectedCallback);
        }
    }
}
于 2012-02-19T22:49:55.317 回答