2013-03-01 21 views
0

给定以下代码,是否可以为Task doThing的实例定义调度程序,创建和延续设置?如何计划任务以使它们不同时运行,包括异步延续

我希望能够安排doThing的多个实例,以便它们实际上仅从其他实例运行(即使它们正在等待其他子任务)。

private static async Task doThing(object i) 
    { 
     Console.WriteLine("in do thing {0}", (int)i); 
     await Task.Delay(TimeSpan.FromSeconds(5)); 
     Console.WriteLine("out of do thing {0}", (int)i); 
    } 
    static void Main(string[] args) 
    { 

     CancellationTokenSource source = new CancellationTokenSource(); 
     ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Current); 

     Task Task1 = Task.Factory.StartNew((Func<object,Task>)doThing, 1, source.Token, TaskCreationOptions.AttachedToParent, pair.ExclusiveScheduler).Unwrap(); 
     Task Task2 = Task.Factory.StartNew((Func<object, Task>)doThing, 2, source.Token, TaskCreationOptions.AttachedToParent, pair.ExclusiveScheduler); 
     Task Task3 = doThing(3); 
     Task Task4 = Task.Factory.StartNew(async (i) => 
     { 
      Console.WriteLine("in do thing {0}", (int)i); 
      await Task.Delay(TimeSpan.FromSeconds(5)); 
      Console.WriteLine("out of do thing {0}", (int)i); 
     }, 4, source.Token, TaskCreationOptions.None, pair.ExclusiveScheduler); 
     Task.WaitAll(Task1, Task2, Task3, Task4); 
     Console.ReadKey(); 
     return; 
    } 
+0

你的意思是'Task2'应该在Task1完成后才能启动? – svick 2013-03-01 22:01:56

+0

是的。我知道'.ContinueWith',但我不关心它们执行的顺序,只要它们不同时执行。 – Jake 2013-03-01 22:19:47

+0

你的“非lambda”任务有什么意义?在你上面的例子中,虽然你不使用lambda,但是你在任务中使用委托,这是唯一重要的事情。 Lambdas只是语法糖。 – 2013-03-03 17:15:00

回答

4

TPL TaskSchedulers一次只能看到一个异步方法的同步段,所以你不能简单地用调度器来完成。但你可以做到这一点,使用更高级别的基元。我经常使用的是TPL Dataflow。

首先,安装NuGet包:

Install-Package Microsoft.Tpl.Dataflow 

然后使用此代码:

private static async Task doThing(object i) { 
    Console.WriteLine("in do thing {0}", (int)i); 
    await Task.Delay(TimeSpan.FromSeconds(5)); 
    Console.WriteLine("out of do thing {0}", (int)i); 
} 

static void Main(string[] args) { 
    CancellationTokenSource source = new CancellationTokenSource(); 
    var exclusivityBlock = new ActionBlock<Func<Task>>(f => f(), new ExecutionDataflowBlockOptions { CancellationToken = source.Token }}; 
    exclusivityBlock.Post(() => doThing(1)); 
    exclusivityBlock.Post(() => doThing(2)); 
    exclusivityBlock.Post(() => doThing(3)); 
    exclusivityBlock.Post(
     async() => { 
      Console.WriteLine("in do thing {0}", 4); 
      await Task.Delay(TimeSpan.FromSeconds(5)); 
      Console.WriteLine("out of do thing {0}", 4); 
     }); 
    exclusivityBlock.Complete(); 
    exclusivityBlock.Completion.Wait(); 
    Console.WriteLine("Done"); 
    Console.ReadKey(); 
    return; 
} 

此代码缺少对每个投递工作项目单个任务。如果这是重要的,你可以使用此示例:

internal static class Program { 
    private static async Task doThing(object i) { 
     Console.WriteLine("in do thing {0}", (int)i); 
     await Task.Delay(TimeSpan.FromSeconds(5)); 
     Console.WriteLine("out of do thing {0}", (int)i); 
    } 

    private static void Main(string[] args) { 
     CancellationTokenSource source = new CancellationTokenSource(); 
     var exclusivityBlock = CreateTrackingBlock<Func<Task>>(
      f => f(), new ExecutionDataflowBlockOptions { CancellationToken = source.Token }); 
     var task1 = exclusivityBlock.PostWithCompletion(() => doThing(1)); 
     var task2 = exclusivityBlock.PostWithCompletion(() => doThing(2)); 
     var task3 = exclusivityBlock.PostWithCompletion(() => doThing(3)); 
     var task4 = exclusivityBlock.PostWithCompletion(
      async() => { 
       Console.WriteLine("in do thing {0}", 4); 
       await Task.Delay(TimeSpan.FromSeconds(5)); 
       Console.WriteLine("out of do thing {0}", 4); 
      }); 

     Task.WaitAll(task1, task2, task3, task4); 
     Console.WriteLine("Done"); 
     Console.ReadKey(); 
     return; 
    } 

    private static ActionBlock<Tuple<T, TaskCompletionSource<object>>> CreateTrackingBlock<T>(Func<T, Task> action, ExecutionDataflowBlockOptions options = null) { 
     return new ActionBlock<Tuple<T, TaskCompletionSource<object>>>(
      async tuple => { 
       try { 
        await action(tuple.Item1); 
        tuple.Item2.TrySetResult(null); 
       } catch (Exception ex) { 
        tuple.Item2.TrySetException(ex); 
       } 
      }, 
      options ?? new ExecutionDataflowBlockOptions()); 
    } 

    internal static Task PostWithCompletion<T>(this ActionBlock<Tuple<T, TaskCompletionSource<object>>> block, T value) { 
     var tcs = new TaskCompletionSource<object>(); 
     var tuple = Tuple.Create(value, tcs); 
     block.Post(tuple); 
     return tcs.Task; 
    } 
} 

不过请注意,这仅仅是更费力一些,因为数据流是不是主要设计用于追踪个人意见,而是整个过程。所以虽然上述工作很好,但Stephen Cleary的答案可能更简单,因此更可取。

+0

+1。我学到了一个新的窍门! :) – 2013-03-03 18:27:19

+0

此外,您可以使用块选项来指定一个'TaskScheduler'。从技术上讲,这是一个lambda任务,但它仍然是一个很棒的技术。 – 2013-03-03 22:18:14

+0

是的,在选项中有很多好东西。至于'lambda任务',正如我对这个问题本身所做的评论一样,它是无关紧要的。方法组和lambda任务几乎适用于所有目的。 – 2013-03-03 22:40:51

2

考虑下面的代码,是不是可以定义调度,创建和延续设置任务doThing的实例?

坏消息是:不,没有办法做到这一点。为非lambda任务定义“调度程序”是没有意义的。创建选项不是必需的,继续选项设置在继续,而不是任务本身。

好消息是:你不需要这种行为。

您想异步同步。内置的方式做到这一点是使用SemaphoreSlim,因为这样的:

SemaphoreSlim mutex = new SemaphoreSlim(1); 
private static async Task doThingAsync(object i) 
{ 
    await mutex.WaitAsync(); 
    try 
    { 
     Console.WriteLine("in do thing {0}", (int)i); 
     await Task.Delay(TimeSpan.FromSeconds(5)); 
     Console.WriteLine("out of do thing {0}", (int)i); 
    } 
    finally 
    { 
     mutex.Release(); 
    } 
} 

就个人而言,我觉得finally语法是尴尬,所以我定义IDisposable和使用using代替。

如果您需要更多功率,Stephen Toub有一个async coordination primitives series,我有一个full suite of primitives in my AsyncEx library。这两种资源都包含AsyncLockTask<IDisposable> WaitAsync()成员,因此您可以使用using而不是finally

+0

我不同意,当然有一种方法可以避免任务之间的并发。我会添加一个我自己的答案。 – 2013-03-03 17:12:53