我有一种情况,即不断生成新任务并将其添加到ConcurrentBag<Tasks>
。C#多线程,等待所有任务在新任务不断添加的情况下完成
我需要等待所有任务完成。
通过WaitAll
等待ConcurrentBag
中的所有任务是不够的,因为在完成上一次等待时任务数量会增加。
此刻我等着它以下列方式:
private void WaitAllTasks()
{
while (true)
{
int countAtStart = _tasks.Count();
Task.WaitAll(_tasks.ToArray());
int countAtEnd = _tasks.Count();
if (countAtStart == countAtEnd)
{
break;
}
#if DEBUG
if (_tasks.Count() > 100)
{
tokenSource.Cancel();
break;
}
#endif
}
}
,我不是很高兴与while(true)
解决方案。
任何人都可以提出一个更好更有效的方式来做到这一点(而不必用while(true)
不断汇集处理器)
其他方面的信息,在意见中的要求。我不认为这与这个问题有关。
这段代码用于网络爬虫。爬虫扫描页面内容并查找两种类型的信息。数据页面和链接页面。数据页面将被扫描并收集数据,链接页面将被扫描,更多的链接将从他们那里收集。
由于每个任务都会执行活动并找到更多链接,因此会将链接添加到EventList
。列表(以下代码)上有一个事件OnAdd
,用于触发其他任务以扫描新添加的URL。等等。
当没有更多正在运行的任务(因此不会再添加链接)并且所有项目都已处理完成时,作业完成。
public IEventList<ISearchStatus> CurrentLinks { get; private set; }
public IEventList<IDataStatus> CurrentData { get; private set; }
public IEventList<System.Dynamic.ExpandoObject> ResultData { get; set; }
private readonly ConcurrentBag<Task> _tasks = new ConcurrentBag<Task>();
private readonly CancellationTokenSource tokenSource = new CancellationTokenSource();
private readonly CancellationToken token;
public void Search(ISearchDefinition search)
{
CurrentLinks.OnAdd += UrlAdded;
CurrentData.OnAdd += DataUrlAdded;
var status = new SearchStatus(search);
CurrentLinks.Add(status);
WaitAllTasks();
_exporter.Export(ResultData as IList<System.Dynamic.ExpandoObject>);
}
private void DataUrlAdded(object o, EventArgs e)
{
var item = o as IDataStatus;
if (item == null)
{
return;
}
_tasks.Add(Task.Factory.StartNew(() => ProcessObjectSearch(item), token));
}
private void UrlAdded(object o, EventArgs e)
{
var item = o as ISearchStatus;
if (item==null)
{
return;
}
_tasks.Add(Task.Factory.StartNew(() => ProcessFollow(item), token));
_tasks.Add(Task.Factory.StartNew(() => ProcessData(item), token));
}
public class EventList<T> : List<T>, IEventList<T>
{
public EventHandler OnAdd { get; set; }
private readonly object locker = new object();
public new void Add(T item)
{
//lock (locker)
{
base.Add(item);
}
OnAdd?.Invoke(item, null);
}
public new bool Contains(T item)
{
//lock (locker)
{
return base.Contains(item);
}
}
}
我不确定你在做什么,这可能不是你问题的最佳解决方案,但你应该看看TPL Dataflow。它允许您创建异步管道。 – john
为什么新任务不断产生?你为什么把它们添加到'ConcurrentBag'中?为什么你需要等待所有任务完成? – PJvG
如果你想等待所有的任务完成,那么为什么你的条件'countAtStart == countAtEnd'而不是'countAtEnd == 0'? – Servy