2017-02-28 72 views
1

我一直在阅读2个小时,我仍然感到困惑。有人说使用StartNew,有人说Task.Run有人说别的。我知道Task.Run给我一个编译错误。简单的并行任务及延续

我需要并行启动多个任务,然后当每个任务成功完成时执行一个延续任务。知道什么时候所有的阻塞都会有帮助。

以下是我有:

public void DoSomeWork(object workItem) 
    { 
     var tasks = new Task<ResultArgs>[_itemList.Count]; 

     for (int loopCnt = 0; loopCnt < _itemList.Count; loopCnt++) 
     { 
      tasks[loopCnt] = new Task<ResultArgs>.Run(() => 
      { 
       return _itemList[loopCnt].Analyze(workItem); 
      }); 
      tasks[loopCnt].ContinueWith(ReportResults, TaskContinuationOptions.ExecuteSynchronously); 
     } 
    } 

的编译表示运行中不存在的任务。

很明显,我有一些东西跑,但我不知道是什么。

我该如何解决这个问题?

+2

'Task.Run'只在4.5中存在,而不在4.0中。 – VMAtm

回答

2

您可以使用async方法,也可以将项目流入数据流,以下代码使用Tpl-dataflow来处理项目,将它们传递到第二个处理步骤,最后等待处理完成。

using NUnit.Framework; 
using System; 
using System.Collections.Concurrent; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace AsyncProcessing { 

    [TestFixture] 
    public class PipelineTests { 

     [Test] 
     public async Task RunPipeline() { 
      var pipeline = new MyPipeline(); 
      var data = Enumerable.Range(0, 1000).Select(x => new WorkItem(x, x)); 

      foreach(var item in data) { 
       await pipeline.SendAsync(item); 
      } 

      pipeline.Complete(); 
      await pipeline.Completion; 

      //all processing complete    
     } 
    } 

    class MyPipeline { 

     private BufferBlock<WorkItem> inputBuffer; 
     private TransformBlock<WorkItem, WorkItem> analyzeBlock; 
     private TransformBlock<WorkItem, ResultArg> reportBlock; 
     private ActionBlock<ResultArg> postOutput; 

     public ConcurrentBag<ResultArg> OutputBuffer { get; } 
     public Task Completion { get { return postOutput.Completion; } } 

     public MyPipeline() { 
      OutputBuffer = new ConcurrentBag<ResultArg>(); 
      CreatePipeline(); 
      LinkPipeline(); 
     } 

     public void Complete() { 
      inputBuffer.Complete(); 
     } 

     public async Task SendAsync(WorkItem data) { 
      await inputBuffer.SendAsync(data); 
     } 

     public void CreatePipeline() { 
      var options = new ExecutionDataflowBlockOptions() { 
       MaxDegreeOfParallelism = Environment.ProcessorCount, 
       BoundedCapacity = 10 
      }; 

      inputBuffer = new BufferBlock<WorkItem>(options); 

      analyzeBlock = new TransformBlock<WorkItem, WorkItem>(item => { 
       //Anylyze item.... 
       return item; 
      }, options); 

      reportBlock = new TransformBlock<WorkItem, ResultArg>(item => { 
       //report your results, email.. db... etc. 
       return new ResultArg(item.JobId, item.WorkValue); 
      }, options); 

      postOutput = new ActionBlock<ResultArg>(item => { 
       OutputBuffer.Add(item); 
      }, options); 
     } 

     public void LinkPipeline() { 
      var options = new DataflowLinkOptions() { 
       PropagateCompletion = true, 
      }; 

      inputBuffer.LinkTo(analyzeBlock, options); 
      analyzeBlock.LinkTo(reportBlock, options); 
      reportBlock.LinkTo(postOutput, options); 
     } 
    } 

    public class WorkItem { 

     public int JobId { get; set; } 
     public int WorkValue { get; set; } 

     public WorkItem(int id, int workValue) { 
      this.JobId = id; 
      this.WorkValue = workValue; 
     } 
    } 

    public class ResultArg { 

     public int JobId { get; set; } 
     public int Result { get; set; } 

     public ResultArg(int id, int result) { 
      this.JobId = id; 
      this.Result = result; 
     } 
    } 
} 
+0

我最终使用了异步方法的建议。由于编译器不是大惊小怪,所以我认为它是正确的。我们会在什么时候开始测试!我也会研究我以前从未见过的TPL数据流。 – AeroClassics

+0

请参阅数据流中的崩溃课程的编辑:)另请参见数据流介绍(http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html) – JSteward

+0

谢谢!更多阅读!斯蒂芬的东西通常很好,值得一读。感谢您的链接! – AeroClassics

0

为什么不使用Parallel.ForEach循环。这是用于并行执行的任务,它可以使用多个线程和执行更快 Parallet.Foreach

但是,如果您正在做一些涉及锁定的数据库相关的输入输出操作,它可能会失败。在这种情况下,我会建议在每个任务中保留一个返回类型,并根据前一个任务的返回类型启用ext任务。

+0

我已经给了Parallel.ForEach一个想法,但被警告说它不会把我所有的任务都放在准备运行的队列中。它会每次都限制数量,并且由于近乎实时的限制,我担心吞吐量。 但是,这确实提醒我我需要担心异常! – AeroClassics

+0

@AeroClassics'async'也有类似的限制,如果它使用defalt任务调度程序。 – VMAtm

+0

哦小提琴。这不是我真正想听到的,但我需要知道的。谢谢! – AeroClassics