0

我有一种情况,即不断生成新任务并将其添加到ConcurrentBag<Tasks>C#多线程,等待所有任务在新任务不断添加的情况下完成

我需要等待所有任务完成。

通过WaitAll等待ConcurrentBag中的所有任务是不够的,因为在完成上一次等待时任务数量会增加。

此刻我等着它以下列方式:

private void WaitAllTasks() 
{ 
    while (true) 
    { 
     int countAtStart = _tasks.Count(); 
     Task.WaitAll(_tasks.ToArray()); 

     int countAtEnd = _tasks.Count(); 
     if (countAtStart == countAtEnd) 
     { 
      break; 
     } 

     #if DEBUG 
     if (_tasks.Count() > 100) 
     { 
      tokenSource.Cancel(); 
      break; 
     } 
     #endif 
    } 
} 

,我不是很高兴与while(true)解决方案。

任何人都可以提出一个更好更有效的方式来做到这一点(而不必用while(true)不断汇集处理器)


其他方面的信息,在意见中的要求。我不认为这与这个问题有关。

这段代码用于网络爬虫。爬虫扫描页面内容并查找两种类型的信息。数据页面和链接页面。数据页面将被扫描并收集数据,链接页面将被扫描,更多的链接将从他们那里收集。

由于每个任务都会执行活动并找到更多链接,因此会将链接添加到EventList。列表(以下代码)上有一个事件OnAdd,用于触发其他任务以扫描新添加的URL。等等。

当没有更多正在运行的任务(因此不会再添加链接)并且所有项目都已处理完成时,作业完成。

public IEventList<ISearchStatus> CurrentLinks { get; private set; } 
public IEventList<IDataStatus> CurrentData { get; private set; } 
public IEventList<System.Dynamic.ExpandoObject> ResultData { get; set; } 
private readonly ConcurrentBag<Task> _tasks = new ConcurrentBag<Task>(); 

private readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); 
private readonly CancellationToken token; 

public void Search(ISearchDefinition search) 
{ 
    CurrentLinks.OnAdd += UrlAdded; 
    CurrentData.OnAdd += DataUrlAdded; 

    var status = new SearchStatus(search); 

    CurrentLinks.Add(status); 

    WaitAllTasks(); 

    _exporter.Export(ResultData as IList<System.Dynamic.ExpandoObject>); 
} 

private void DataUrlAdded(object o, EventArgs e) 
{ 
    var item = o as IDataStatus; 
    if (item == null) 
    { 
     return; 
    } 

    _tasks.Add(Task.Factory.StartNew(() => ProcessObjectSearch(item), token)); 
} 

private void UrlAdded(object o, EventArgs e) 
{ 
    var item = o as ISearchStatus; 
    if (item==null) 
    { 
     return; 
    } 

    _tasks.Add(Task.Factory.StartNew(() => ProcessFollow(item), token)); 
    _tasks.Add(Task.Factory.StartNew(() => ProcessData(item), token)); 
} 

public class EventList<T> : List<T>, IEventList<T> 
{ 
    public EventHandler OnAdd { get; set; } 
    private readonly object locker = new object(); 
    public new void Add(T item) 
    { 
     //lock (locker) 
     { 
      base.Add(item); 
     } 
     OnAdd?.Invoke(item, null); 
    } 

    public new bool Contains(T item) 
    { 
     //lock (locker) 
     { 
      return base.Contains(item); 
     } 
    } 
} 
+5

我不确定你在做什么,这可能不是你问题的最佳解决方案,但你应该看看TPL Dataflow。它允许您创建异步管道。 – john

+0

为什么新任务不断产生?你为什么把它们添加到'ConcurrentBag'中?为什么你需要等待所有任务完成? – PJvG

+0

如果你想等待所有的任务完成,那么为什么你的条件'countAtStart == countAtEnd'而不是'countAtEnd == 0'? – Servy

回答

0

为什么不编写一个函数,在创建时根据需要生成您的任务?这样你就可以使用Task.WhenAll等待他们完成,或者,我错过了这一点? See this working here

using System; 
using System.Threading.Tasks; 
using System.Collections.Generic; 

public class Program 
{ 
    public static void Main() 
    { 
     try 
     { 
      Task.WhenAll(GetLazilyGeneratedSequenceOfTasks()).Wait(); 
      Console.WriteLine("Fisnished."); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex); 
     } 
    } 

    public static IEnumerable<Task> GetLazilyGeneratedSequenceOfTasks() 
    { 
     var random = new Random(); 
     var finished = false; 
     while (!finished) 
     { 
      var n = random.Next(1, 2001); 
      if (n < 50) 
      { 
       finished = true; 
      } 

      if (n > 499) 
      { 
       yield return Task.Delay(n); 
      } 

      Task.Delay(20).Wait();    
     } 

     yield break; 
    } 
} 

另外,如果你的问题是不是微不足道的我的答案可能暗示,我会考虑用TPL Dataflow的网格。 BufferBlockActionBlock的组合会让你非常接近你所需要的。你可以start here


无论哪种方式,我建议你要包括接受CancellationToken或两个的规定。

+0

我认为你提供的代码将不起作用,因为可能会添加一些额外的任务,并且枚举将抛出。 – VMAtm

+0

@VMAtm我的想法是,在想要完成序列之前不要'突破'。然而,正如我所说的,对于任何非平凡的解决方案来说,调查TPL Dataflow。 – Jodrell

+0

这不起作用,因为任务列表随产量发生而被修改。 –

0

我认为这项任务可以用TPL Dataflow库进行非常基本的设置。你需要一个TransformManyBlock<Task, IEnumerable<DataTask>>ActionBlock(可能是更多的人),实际数据处理,像这样:

// queue for a new urls to parse 
var buffer = new BufferBlock<ParseTask>(); 

// parser itself, returns many data tasks from one url 
// similar to LINQ.SelectMany method 
var transform = new TransformManyBlock<ParseTask, DataTask>(task => 
{ 
    // get all the additional urls to parse 
    var parsedLinks = GetLinkTasks(task); 
    // get all the data to parse 
    var parsedData = GetDataTasks(task); 

    // setup additional links to be parsed 
    foreach (var parsedLink in parsedLinks) 
    { 
     buffer.Post(parsedLink); 
    } 

    // return all the data to be processed 
    return parsedData; 
}); 

// actual data processing 
var consumer = new ActionBlock<DataTask>(s => ProcessData(s)); 

之后,你需要在每个之间的块链接:

buffer.LinkTo(transform, new DataflowLinkOptions { PropagateCompletion = true }); 
transform.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true }); 

现在你有一个很好的管道,它将在后台执行。在你意识到你所需要的每件事都被解析的时候,你只需要调用Complete方法来阻止接受新闻消息。经过buffer成了空的,它将在完成向下的管道传播到transform块,将它传播到消费者(S),你需要等待Completion任务:

// no additional links would be accepted 
buffer.Complete(); 
// after all the tasks are done, this will get fired 
await consumer.Completion; 

您可以检查的那一刻为完成,例如,如果两个bufferCount属性transformInputCounttransform'CurrentDegreeOfParallelism(这是为TransformManyBlock内部属性)等于0

但是,我建议你在这里实现一些额外的逻辑来确定当前的变压器数量,因为使用内部逻辑并不是一个好的解决方案。至于取消流水线,您可以创建一个TPL块,其中包含CancellationToken,可以是所有块中的一个,也可以是每个块专用的块,从中获取取消。