4

我正在使用TPL,但是我发现它使用了棘手的单元测试代码。是否有一种方法来限制任务并行库使用的线程?

我试图不要introduce a wrapper,因为我觉得它可能会引入问题。

我知道你可以在TPL中设置处理器亲和力,但真正好的是设置线程最大值(可能是每个应用程序域)。因此,将线程最大值设置为1时,TPL将被强制使用任何线程。

您认为如何?这是可能的(我敢肯定它不是),应该它有可能吗?

编辑:这里有一个例子

public class Foo 
{ 
    public Foo() 
    { 
     Task.Factory.StartNew(() => somethingLong()) 
      .ContinueWith(a => Bar = 1) ; 
    } 
} 

[Test] public void Foo_should_set_Bar_to_1() 
{ 
    Assert.Equal(1, new Foo().Bar) ; 
} 

测试可能将无法​​通过,除非我引入一个延迟。我想要有类似Task.MaximumThreads=1的东西,这样TPL将连续运行。

+0

我怀疑你正在做错事情。你想要做什么? – 2010-08-24 03:32:10

+0

基本上,我希望我的代码在测试时顺序运行,以便在单元测试中不会结束睡眠。 – 2010-08-24 09:18:42

+1

在某些情况下,您可能能够使用Task.WaitAll()让您的测试等待,直到任务完成,然后您可以断言状态。在其他人中,将逻辑从任务中提取出来并严格按照顺序进行测试可能是正确的做法。如何用一个具体的例子来更新你的问题,我可以尝试提供一个解决方案。 – 2010-08-24 19:42:14

回答

4

您可以创建自己的TaskScheduler类,从TaskScheduler派生,将其传递到TaskFactory。现在,您可以创建任何Task对象,该对象与调度程序一起运行。

无需将其设置为使用一个线程。

然后,就在你断言之前,就打电话给Dispose()就可以了。

public void Dispose() 
{ 
    if (tasks != null) 
    { 
     tasks.CompleteAdding(); 

     foreach (var thread in threads) thread.Join(); 

     tasks.Dispose(); 
     tasks = null; 
    } 
} 

这将保证所有的任务已运行 - :在内部,如果你遵循的样品在那里写一个TaskScheduler它会做这样的事情。现在你可以继续你的断言。

如果您想检查进展情况,您也可以在任务运行后使用ContinueWith(...)添加断言。

+0

谢谢,这是一个非常有趣的想法。我会看一看。 – 2010-09-06 07:09:10

2

真的,这对于lambda重码的可测试性而言,更多的是与TPL有关的问题。 Hightechrider的建议是一个很好的建议,但基本上你的测试仍然在测试TPL,就像它们是你的代码一样。当第一个任务结束并且ContinueWith开始下一个任务时,您并不需要测试它。

如果你的lambdas里面的代码非常大,那么把它拉出到一个更明确定义参数的更可测试的方法中可能会导致更容易阅读和更易测试的代码。你可以编写单元测试。在可能的情况下,我尝试限制或删除单元测试中的并行性。

话虽如此,我想看看调度程序的方法是否可行。如果你是想摆脱需要重载可以在Task.Factory.ContinueWhenAll(...)包裹单元测试代码构造下面是一个使用修改StaTaskScheduler实现从http://code.msdn.microsoft.com/ParExtSamples

using System; 
    using System.Collections.Concurrent; 
    using System.Collections.Generic; 
    using System.Linq; 
    using System.Threading; 
    using System.Threading.Tasks; 
    using Xunit; 

    namespace Example 
    { 
     public class Foo 
     { 
     private TaskScheduler _scheduler; 

    public int Bar { get; set; } 

    private void SomethingLong() 
    { 
     Thread.SpinWait(10000); 
    } 

    public Foo() 
     : this(TaskScheduler.Default) 
    { 
    } 

    public Foo(TaskScheduler scheduler) 
    { 
     _scheduler = scheduler; 
    } 

    public void DoWork() 
    { 
     var factory = new TaskFactory(_scheduler); 

     factory.StartNew(() => SomethingLong()) 
     .ContinueWith(a => Bar = 1, _scheduler); 
    } 
    } 

    public class FooTests 
    { 
    [Fact] 
    public void Foo_should_set_Bar_to_1() 
    { 
     var sch = new StaTaskScheduler(3); 
     var target = new Foo(sch); 
     target.DoWork(); 

     sch.Dispose(); 
     Assert.Equal(1, target.Bar); 
    } 
    } 

    public sealed class StaTaskScheduler : TaskScheduler, IDisposable 
    { 
    /// <summary>Stores the queued tasks to be executed by our pool of STA threads.</summary> 
    private BlockingCollection<Task> _tasks; 
    /// <summary>The STA threads used by the scheduler.</summary> 
    private readonly List<Thread> _threads; 

    /// <summary>Initializes a new instance of the StaTaskScheduler class with the specified concurrency level.</summary> 
    /// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param> 
    public StaTaskScheduler(int numberOfThreads) 
    { 
     // Validate arguments 
     if (numberOfThreads < 1) throw new ArgumentOutOfRangeException("concurrencyLevel"); 

     // Initialize the tasks collection 
     _tasks = new BlockingCollection<Task>(); 

     // Create the threads to be used by this scheduler 
     _threads = Enumerable.Range(0, numberOfThreads).Select(i => 
     { 
     var thread = new Thread(() => 
     { 
      // Continually get the next task and try to execute it. 
      // This will continue until the scheduler is disposed and no more tasks remain. 
      foreach (var t in _tasks.GetConsumingEnumerable()) 
      { 
      TryExecuteTask(t); 
      } 
     }); 
     thread.IsBackground = true; 
     // NO STA REQUIREMENT! 
     // thread.SetApartmentState(ApartmentState.STA); 
     return thread; 
     }).ToList(); 

     // Start all of the threads 
     _threads.ForEach(t => t.Start()); 
    } 

    /// <summary>Queues a Task to be executed by this scheduler.</summary> 
    /// <param name="task">The task to be executed.</param> 
    protected override void QueueTask(Task task) 
    { 
     // Push it into the blocking collection of tasks 
     _tasks.Add(task); 
    } 

    /// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary> 
    /// <returns>An enumerable of all tasks currently scheduled.</returns> 
    protected override IEnumerable<Task> GetScheduledTasks() 
    { 
     // Serialize the contents of the blocking collection of tasks for the debugger 
     return _tasks.ToArray(); 
    } 

    /// <summary>Determines whether a Task may be inlined.</summary> 
    /// <param name="task">The task to be executed.</param> 
    /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param> 
    /// <returns>true if the task was successfully inlined; otherwise, false.</returns> 
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 
    { 
     // Try to inline if the current thread is STA 
     return 
     Thread.CurrentThread.GetApartmentState() == ApartmentState.STA && 
     TryExecuteTask(task); 
    } 

    /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 
    public override int MaximumConcurrencyLevel 
    { 
     get { return _threads.Count; } 
    } 

    /// <summary> 
    /// Cleans up the scheduler by indicating that no more tasks will be queued. 
    /// This method blocks until all threads successfully shutdown. 
    /// </summary> 
    public void Dispose() 
    { 
     if (_tasks != null) 
     { 
     // Indicate that no new tasks will be coming in 
     _tasks.CompleteAdding(); 

     // Wait for all threads to finish processing tasks 
     foreach (var thread in _threads) thread.Join(); 

     // Cleanup 
     _tasks.Dispose(); 
     _tasks = null; 
     } 
    } 
    } 
} 
+0

谢谢你的Ade。我也发现将lamda中的代码分隔成可测试的方法是最好的方法。在某些地方,我有一个构造函数重载,它需要一个bool来指示同步或异步操作。这种类型打破了我的类型的用户隐藏实现的目标,但目前似乎是一个很好的折衷。 – 2010-09-06 07:15:21

1

public class Foo 
{ 
    public Foo() 
    { 
     Task.Factory.StartNew(() => somethingLong()) 
      .ContinueWith(a => Bar = 1) ; 
    } 
} 

[Test] public void Foo_should_set_Bar_to_1() 
{ 
    Foo foo; 
    Task.Factory.ContinueWhenAll(
     new [] { 
      new Task(() => { 
       foo = new Foo(); 
      }) 
     }, 
     asserts => { 
      Assert.Equal(1, foo.Bar) ; 
     } 
    ).Wait; 
} 

希望听到这种方法的一些反馈。

相关问题