2010-12-02 71 views
11

在使用任务并行库之前,我经常使用CorrelationManager.ActivityId跟踪多线程的跟踪/错误报告。任务并行库中的任务如何影响ActivityID?

ActivityId存储在线程本地存储中,所以每个线程都有它自己的副本。这个想法是,当你启动一个线程(活动)时,你分配一个新的ActivityId。 ActivityId将与任何其他跟踪信息一起写入日志,使单个“活动”的跟踪信息成为可能。这对于WCF非常有用,因为ActivityId可以结转到服务组件。

这里是我谈论的例子:

static void Main(string[] args) 
{ 
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) => 
    { 
     DoWork(); 
    })); 
} 

static void DoWork() 
{ 
    try 
    { 
     Trace.CorrelationManager.ActivityId = Guid.NewGuid(); 
     //The functions below contain tracing which logs the ActivityID. 
     CallFunction1(); 
     CallFunction2(); 
     CallFunction3(); 
    } 
    catch (Exception ex) 
    { 
     Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString()); 
    } 
} 

现在,随着TPL,我的理解是多任务共享线程。这是否意味着ActivityId很容易被重新初始化(通过其他任务)?有没有新的机制来处理活动追踪?

+0

我没有什么优惠,但我也对这个问题感兴趣。似乎同样的问题也适用于使用CallContext.LogicalSetData的信息集,因为这是Trace.CorrelationManager用来存储ActivityId和LogicalOperationStack的技术。 – wageoghe 2010-12-14 19:47:08

+0

@wageohe - 我终于可以今天这个测试,已经张贴了我的结果:) – 2010-12-15 00:35:58

+0

我张贴一些更多的细节在我的答案。我还在这里发布了一个关于SO的另一个答案的链接,这是我在这里问到的一个新问题,以及我在Microsoft的Parallel Extensions论坛上提问(但尚未在2011年1月21日回答)的问题。也许你会发现有用的信息,也许不会。 – wageoghe 2011-01-21 15:54:47

回答

6

我跑了一些实验,结果我的问题中的假设是不正确的 - 用TPL创建的多个任务不能同时在同一个线程上运行。

ThreadLocalStorage在.NET 4.0中与TPL一起使用是安全的,因为线程一次只能由一个任务使用。

这些任务可以共享并发的基础上接受采访时,我听到关于C#5.0DotNetRocks(抱歉,我不记得这表明它是)线程的假设 - 所以我的问题可能(也可能不会)即将成为相关。

我的实验启动了许多任务,并记录了运行了多少任务,他们花了多长时间,以及消耗了多少线程。如果有人想重复,代码如下。

class Program 
{ 
    static void Main(string[] args) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 
     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
      task = Task.Factory.StartNew(() => 
      { 
       DoLongRunningWork(); 
      }, taskCreationOpt); 

      allTasks[i] = task; 
     } 

     Task.WaitAll(allTasks); 
     stopwatch.Stop(); 

     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.ReadKey(); 
    } 


    private static List<int> threadIds = new List<int>(); 
    private static object locker = new object(); 
    private static void DoLongRunningWork() 
    { 
     lock (locker) 
     { 
      //Keep a record of the managed thread used. 
      if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
       threadIds.Add(Thread.CurrentThread.ManagedThreadId); 
     } 
     Guid g1 = Guid.NewGuid(); 
     Trace.CorrelationManager.ActivityId = g1; 
     Thread.Sleep(3000); 
     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 
    } 
} 

输出(当然这将取决于机器上)是:

Completed 100 tasks in 23097 milliseconds 
Used 23 threads 

更改taskCreationOpt到TaskCreationOptions.LongRunning给出不同的结果:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads 
3

请原谅我的这个帖子作为一个答案,因为它不是真的回答你的问题,但是,它涉及到你的问题,因为它涉及CorrelationManager行为和线程/任务/等。我一直在考虑使用CorrelationManager的LogicalOperationStack(和StartLogicalOperation/StopLogicalOperation方法)在多线程场景中提供额外的上下文。

我把你的例子稍加修改,添加使用Parallel.For并行执行工作的能力。此外,我使用StartLogicalOperation/StopLogicalOperation括号(内部)DoLongRunningWork。从概念上讲,DoLongRunningWork做这样的事情是每次执行:

DoLongRunningWork 
    StartLogicalOperation 
    Thread.Sleep(3000) 
    StopLogicalOperation 

我发现,如果我添加这些逻辑操作代码(或多或少是),所有的逻辑operatins的保持同步(总是预期的堆栈操作次数和堆栈操作的值始终如预期)。

在我的一些测试中,我发现并非总是如此。逻辑操作堆栈正在“损坏”。我可以想到的最好的解释是,当“子”线程退出时,将“CallContext”信息“合并”回“父”线程上下文导致“旧”子线程上下文信息(逻辑操作)为“继承“由另一个兄弟的子线程。

这个问题也可能与以下事实有关:Parallel.For显然使用主线程(至少在示例代码中,如写入的那样)作为“工作线程”之一(或者任何它们应该在并行域)。无论何时执行DoLongRunningWork,都会启动一个新的逻辑操作(开始时)并停止(结束时)(也就是将其压入LogicalOperationStack并从中弹出)。如果主线程已经具有逻辑操作并且DoLongRunningWork在主线程上执行,则启动新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作。任何后续的DoLongRunningWork执行(只要DoLongRunningWork的这个“迭代”在主线程上执行)将(显然)继承主线程的LogicalOperationStack(现在有两个操作,而不仅仅是一个预期的操作)。

我花了很长时间才弄清楚为什么LogicalOperationStack的行为在我的示例中与我的示例的修改版本中的行为不同。最后,我看到在我的代码中,我用逻辑操作括住了整个程序,而在我的修改版本的测试程序中,我没有。其含义是,在我的测试程序中,每次执行我的“工作”(类似于DoLongRunningWork),已经有一个合理的操作。在我的测试程序的修改版本中,我没有将逻辑操作中的整个程序括起来。

所以,当我修改你的测试程序来将逻辑操作中的整个程序括起来,并且如果我使用的是Parallel.For,我遇到了完全相同的问题。

使用上面的概念模型,这将成功运行:

Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 

虽然这最终将导致断言到一个明显的不同步LogicalOperationStack的:

StartLogicalOperation 
Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 
StopLogicalOperation 

这里是我的示例程序。它和你的类似,它有一个操作ActivityId和LogicalOperationStack的DoLongRunningWork方法。我也有两种踢DoLongRunningWork的风格。一种风味使用任务,一种使用Parallel.For。也可以执行每种风格,以使得整个并行操作被封闭在逻辑操作中或不被操作。所以,总共有4种方式来执行并行操作。要尝试每一个,只需取消注释所需的“使用...”方法,重新编译并运行。 UseTasks,UseTasks(true)UseParallelFor应该全部运行完成。 UseParallelFor(true)将在某个时刻断言,因为LogicalOperationStack没有预期的条目数。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Diagnostics; 
using System.Threading; 
using System.Threading.Tasks; 

namespace CorrelationManagerParallelTest 
{ 
    class Program 
    {  
    static void Main(string[] args)  
    { 
     //UseParallelFor(true) will assert because LogicalOperationStack will not have expected 
     //number of entries, all others will run to completion. 

     UseTasks(); //Equivalent to original test program with only the parallelized 
         //operation bracketed in logical operation. 
     ////UseTasks(true); //Bracket entire UseTasks method in logical operation 
     ////UseParallelFor(); //Equivalent to original test program, but use Parallel.For 
          //rather than Tasks. Bracket only the parallelized 
          //operation in logical operation. 
     ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation 
    }  

    private static List<int> threadIds = new List<int>();  
    private static object locker = new object();  

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId; 

    private static int mainThreadUsedInDelegate = 0; 

    // baseCount is the expected number of entries in the LogicalOperationStack 
    // at the time that DoLongRunningWork starts. If the entire operation is bracketed 
    // externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise, 
    // it will be 0. 
    private static void DoLongRunningWork(int baseCount)  
    { 
     lock (locker) 
     { 
     //Keep a record of the managed thread used.    
     if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
      threadIds.Add(Thread.CurrentThread.ManagedThreadId); 

     if (Thread.CurrentThread.ManagedThreadId == mainThreadId) 
     { 
      mainThreadUsedInDelegate++; 
     } 
     }   

     Guid lo1 = Guid.NewGuid(); 
     Trace.CorrelationManager.StartLogicalOperation(lo1); 

     Guid g1 = Guid.NewGuid();   
     Trace.CorrelationManager.ActivityId = g1; 

     Thread.Sleep(3000);   

     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 

     //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation 
     //in effect when the Parallel.For operation was started. 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1)); 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1)); 

     Trace.CorrelationManager.StopLogicalOperation(); 
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
     task = Task.Factory.StartNew(() => 
     { 
      DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }, taskCreationOpt); 
     allTasks[i] = task; 
     } 
     Task.WaitAll(allTasks); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    private static void UseParallelFor(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Parallel.For(0, totalThreads, i => 
     { 
     DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    } 
} 

这整个如果LogicalOperationStack可以用的Parallel.For使用的问题(和/或其他线程/任务构建体)或它如何被使用的可能是值得它自己的问题。也许我会发布一个问题。同时,我想知道你是否对此有任何想法(或者,我想知道是否考虑过使用LogicalOperationStack,因为ActivityId看起来是安全的)。

[编辑]

见我的回答this question有关使用LogicalOperationStack和/或CallContext.LogicalSetData与一些不同的线程/线程池/任务/并行contstructs的更多信息。

也看到这里,我的问题上SO约LogicalOperationStack和并行扩展: Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc

最后,还看到我的问题在这里对微软的并行扩展论坛: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

在我的测试中,它看起来像跟踪。如果您在主线程中启动逻辑操作,然后在委托中启动/停止逻辑操作,则CorrelationManager.LogicalOperationStack可能会在使用Parallel.For或Parallel.Invoke时损坏。在我的测试(见上述两种链接)的LogicalOperationStack应该总是恰好有2项时DoLongRunningWork正在执行(如果我使用各种技术DoLongRunningWork的踢前开始在主线程的逻辑运算)。所以,通过“损坏”我的意思是说,LogicalOperationStack最终会有超过2个条目。

从我可以告诉,这可能是因为和的Parallel.For使用Parallel.Invoke主线程为“工人”的一个线程来执行DoLongRunningWork动作。

使用存储在CallContext.LogicalSetData中的堆栈来模拟LogicalOperationStack(类似于通过CallContext.SetData存储的log4net的LogicalThreadContext.Stacks)的行为会产生更糟的结果。如果我正在使用这样的堆栈来维护上下文,那么在几乎所有的情况下,它都会被损坏(即没有预期的条目数),因为我在主线程中有一个“逻辑操作”,每次迭代都有一个逻辑操作/执行DoLongRunningWork委托。