2015-02-08 63 views
3

我试图创建一个TaskScheduler,它按顺序运行所有任务,但只会“完成”最近计划的任务。例如,如果我使用它来计划任务A,那么在它完成计划任务B和C之前,我只希望C被认为是成功的。 A可以继续工作,但在完成时应被视为“取消”,B应在开始前被标记为取消。取消TaskScheduler中的TPL任务

我已经有了在线程池上按顺序执行委托的现有代码,并且管理最多有2个排队任务的想法 - 一个当前正在执行,另一个是下一个。缺少的部分是能够将任务的结果状态设置为取消

不幸的是,从TaskScheduler内部看来,您实际上几乎不能访问Task或任何CancellationToken的状态。

我试图通过跟踪最后排队的任务来解决这个问题,并且在执行任务时抛出TaskCancelledException,如果它不等于最后排队的任务,但那不会似乎工作。我想这是因为这个异常不会被抛入任务的委托中,而所有'魔术'实际上都是在TryExecuteTask()内部处理的。

下面是我有:

public class CurrentPendingTaskScheduler : TaskScheduler 
     { 
     private readonly ThreadSafeCurrentPendingQueueProcessor<Task> _Processor; 
     private Task _LastTask; 

     public CurrentPendingTaskScheduler() 
      { 
      _Processor = new ThreadSafeCurrentPendingQueueProcessor<Task>(); 
      _Processor.Process += _Processor_Process; 
      } 

     private void _Processor_Process(Task obj) 
      { 
      // If there's a newer task already, cancel this one before starting 
      if (obj != _LastTask) 
       throw new TaskCanceledException(obj); 

      TryExecuteTask(obj); 

      // If a newer task was added whilst we worked, cancel this one 
      if (obj != _LastTask) 
       throw new TaskCanceledException(obj); 
      } 

     protected override void QueueTask(Task task) 
      { 
      _LastTask = task; 
      _Processor.Enqueue(task); 
      } 

     protected override Boolean TryExecuteTaskInline(Task task, Boolean taskWasPreviouslyQueued) 
      { 
      return false; 
      } 

     protected override IEnumerable<Task> GetScheduledTasks() 
      { 
      throw new NotImplementedException(); 
      } 
     } 

ThreadSafeCurrentPendingQueueProcessor<>类是通过事件回调,以处理在单个后台线程排队的项目,只允许一个活动项目,一个悬而未决的一个帮手项目。

如果'最后的任务'在处理器回调之前发生了变化,则异常只会阻止任务运行(但不会影响其状态)。如果回调确实运行,但在此期间“最后的任务”已经改变,那么在任何延续已经开始之后,异常就会被抛得太晚。

另外我不确定是否有这个原因,但是第一次使用调度程序(我为每个UI元素单击安排一个任务),QueueTask被调用一次,并带有新任务。然而,对于后续的每个调度,它都被调用两次。这会让事情变得更糟,因为_LastTask被覆盖。

我觉得TaskCompletionSource<>可能有一定的用处,但看不出如何。

是否可以实现按照描述工作的TaskScheduler?我知道我可以在调度程序之外实现这种行为,也就是在创建任务的时候,但我需要在很多地方使用它,并且试图通过将它放入可重用的调度程序中来使生活更轻松。

+2

我不认为你可以做到这一点。 “TaskScheduler”的工作是决定什么时候,什么时候执行一个Task,但不知道结果会是什么。 – svick 2015-05-06 21:36:56

回答

0

我会创建一个helper类,它接受一个输入操作,启动它,取消现有的操作,并覆盖它的内部变量。由于您不能直接在Task<T>上执行Cancel(),因此您需要保留自己的TaskCancellationSource方便。如果您想提供外部令牌,则可以将它们与CancellationTokenSource.CreateLinkedTokenSource(...)结合使用。如果您需要密切关注结果,那么这将成为TaskCompletionSource的良机。

public class OverwriteTaskHandler<T> 
{ 
    private Task<T> _task; 
    private TaskCompletionSource<T> _tcs; 
    private CancellationTokenSource _cts; 

    public OverwriteTaskHandler(Func<T> operation) 
    { 
     _tcs = new TaskCompletionSource<T>(); 
     _cts = new CancellationTokenSource(); 
     TryPushTask(operation); 
    } 

    public bool TryPushTask(Func<T> operation) 
    { 
     if (_tcs.Task.IsCompleted) 
      return false; //It would be unsafe to use this instance as it is already "finished" 
     _cts.Cancel(); 
     _cts = new CancellationTokenSource(); 
     _task = Task.Run(operation, _cts.Token); 
     _task.ContinueWith(task => _tcs.SetResult(task.Result)); 
     return true; 
    } 

    public void Cancel() 
    { 
     _cts.Cancel(); 
    } 

    public Task<T> WrappedTask { get { return _tcs.Task; } } 
} 

Discalimer:我没有测试过这一点,所以才仔细检查!

0

AFAIK TaskScheduler不能用于完成这项工作。你真的需要别的东西。例如,你可以自己写一个辅助类有以下用途:

static CurrentPendingTaskContext ctx = ...; 

async Task MyAsyncFunc() { 
await ctx.RegisterAndMaybeCancel(); 
try { 
    //rest of method 
} 
finally { 
    ctx.NotifyCompletion(); 
} 
} 

RegisterAndMaybeCancel会等到当前运行的任务就完成了。如果这个特定的任务已经被另一个任务取代,它会抛出取消异常。

我现在没有时间来实现这个类(尽管它很诱人)。但我认为这种语法模式非常简单,您可以在很多地方使用它。

您还可以使用IDisposable模式这个摆脱了最后:

async Task MyAsyncFunc() { 
using (await ctx.RegisterAndMaybeCancel()) 
{ 
     //rest of method 
} 
}