2017-04-11 103 views
2

我想同时抓取几个URL。每个请求可能会将更多网址添加到ConcurrentBag进行抓取。目前我有一个令人讨厌的(真),开始新的Parallel.ForEach来处理任何新的URL。添加项目到ConcurrentBag用于Parallel.ForEach c#

是否有任何方法可以添加到ConcurrentBag的内容中,因此Parallel.ForEach会看到其中有新项目并继续迭代这些新项目?

ConcurrentBag<LinkObject> URLSToCheck = new ConcurrentBag<LinkObject>(); 

while (true) 
{ 
    Parallel.ForEach(URLSToCheck, new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL => 
    { 
     Checker Checker = new Checker(); 

     URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL); 

     List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html); 

     foreach (var URLToAdd in URLsToAdd) 
     { 
      URLSToCheck.Add(new LinkObject { sourceURL = URLToAdd.sourceURL, destinationURL = URLToAdd.destinationURL }); 
     } 
    }); 

    if(URLSToCheck.Count == 0)break; 
} 
+0

潜入递归代码可能会有帮助。这是一个适用的典型例子。顺便说一句,谨防循环引用。 – Stefan

+0

谢谢我会检查出来! :-) – jamie

回答

2

DataFlow在这里可以得心应手。随着ActionBlock可以很好地完成:

// Capture the variable, so it can be used in the next block 
ActionBlock<LinkObject> = actionBlock = null; 

actionBlock = new ActionBlock<LinkObject>(URL => 
{ 
    Checker Checker = new Checker(); 
    URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL); 
    List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html); 
    URLsToAdd.ForEach(actionBlock.Post) 
},new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5}); 

,然后添加到您的actionBlock初始网址:

actionBlock.Post(url1); 
actionBlock.Post(url2); 
... 
+0

谢谢,这确实有助于:-)如果其他人使用此安装Microsoft.Tpl.Dataflow通过NuGet – jamie

3

你可以看看BlockingCollection

BlockingCollection提供了生产者/消费者模式的实现:您的生产者将添加到阻塞集合中,并且您的Parallel.ForEach将从集合中消耗。

要做到这一点,你将不得不实现自定义分区的BlockingCollection(的原因说明如下:https://blogs.msdn.microsoft.com/pfxteam/2010/04/06/parallelextensionsextras-tour-4-blockingcollectionextensions/

分区程序:

class BlockingCollectionPartitioner<T> : Partitioner<T> 
{ 
    private BlockingCollection<T> _collection; 

    internal BlockingCollectionPartitioner(BlockingCollection<T> collection) 
    { 
     if (collection == null) 
      throw new ArgumentNullException("collection"); 
     _collection = collection; 
    } 

    public override bool SupportsDynamicPartitions 
    { 
     get { return true; } 
    } 

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount) 
    { 
     if (partitionCount < 1) 
      throw new ArgumentOutOfRangeException("partitionCount"); 

     var dynamicPartitioner = GetDynamicPartitions(); 
     return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray(); 
    } 

    public override IEnumerable<T> GetDynamicPartitions() 
    { 
     return _collection.GetConsumingEnumerable(); 
    } 
} 

然后,你将用它喜欢:

BlockingCollection<LinkObject> URLSToCheck = new BlockingCollection<LinkObject>(); 

Parallel.ForEach(
    new BlockingCollectionPartitioner<LinkObject>(URLSToCheck), 
    new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL => 
     { 
      //.... 
     }); 

在另一个线程中,您将添加到URLSToCheck集合:

URLSToCheck.Add(...) 

当您完成网址处理时,您可以拨打URLSToCheck.CompleteAdding()Parallel.ForEach应该自动停止。