2012-02-13 45 views
3

尝试使用Async CTP编写HTML搜寻器我已经停滞不前,不知道如何编写无递归方法来完成此任务。使用异步CTP并发下载HTML页面

这是我到目前为止的代码。

private readonly ConcurrentStack<LinkItem> _LinkStack; 
private readonly Int32 _MaxStackSize; 
private readonly WebClient client = new WebClient(); 

Func<string, string, Task<List<LinkItem>>> DownloadFromLink = async (BaseURL, uri) => 
{ 
    string html = await client.DownloadStringTaskAsync(uri); 
    return LinkFinder.Find(html, BaseURL); 
}; 

Action<LinkItem> DownloadAndPush = async (o) => 
{ 
    List<LinkItem> result = await DownloadFromLink(o.BaseURL, o.Href); 
    if (this._LinkStack.Count() + result.Count <= this._MaxStackSize) 
    { 
     this._LinkStack.PushRange(result.ToArray()); 
     o.Processed = true; 
    } 
}; 

Parallel.ForEach(this._LinkStack, (o) => 
{ 
    DownloadAndPush(o); 
}); 

但显然,这并不工作,我会因为那个Parallel.ForEach执行第一个(也是唯一的迭代)的时候希望我只有只有1项。我可以想到的最简单的方法,使ForEach递归,但我不能(我不认为)这样做,因为我会很快用完堆栈空间。

任何人都可以请指导我如何重构此代码,创建我将描述为递归延续,直到达到MaxStackSize或系统内存不足时递增项目?

+0

+1。控制递归的人控制着宇宙! – toddmo 2013-12-14 03:31:12

回答

10

我认为使用C#5/.Net 4.5做这种事情的最好方法是使用TPL Dataflow。甚至有a walkthrough on how to implement web crawler using it

基本上,你创建一个“块”这需要下载一个URL,并从中获得链接的护理:

var cts = new CancellationTokenSource(); 

Func<LinkItem, Task<IEnumerable<LinkItem>>> downloadFromLink = 
    async link => 
      { 
       // WebClient is not guaranteed to be thread-safe, 
       // so we shouldn't use one shared instance 
       var client = new WebClient(); 
       string html = await client.DownloadStringTaskAsync(link.Href); 

       return LinkFinder.Find(html, link.BaseURL); 
      }; 

var linkFinderBlock = new TransformManyBlock<LinkItem, LinkItem>(
    downloadFromLink, 
    new ExecutionDataflowBlockOptions 
    { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token }); 

您可以设置MaxDegreeOfParallelism你想要的任何值。它至多说可以同时下载多少个URL。如果你不想限制它,你可以将它设置为DataflowBlockOptions.Unbounded

然后,您创建一个块,以某种方式处理所有下载的链接,如将它们全部存储在列表中。它也可以决定何时取消下载:

var links = new List<LinkItem>(); 

var storeBlock = new ActionBlock<LinkItem>(
    linkItem => 
    { 
     links.Add(linkItem); 
     if (links.Count == maxSize) 
      cts.Cancel(); 
    }); 

由于我们没有设定MaxDegreeOfParallelism,则默认为1。这意味着使用集合不是线程安全应该没问题在这里。

我们再创建一个块:它将从linkFinderBlock开始,并将其传递给storeBlock并返回linkFinderBlock

var broadcastBlock = new BroadcastBlock<LinkItem>(li => li); 

其构造函数中的lambda是一个“克隆函数”。如果需要,可以使用它来创建项目的克隆,但在此不需要,因为我们在创建后不修改LinkItem

现在我们可以将模块连接在一起:

linkFinderBlock.LinkTo(broadcastBlock); 
broadcastBlock.LinkTo(storeBlock); 
broadcastBlock.LinkTo(linkFinderBlock); 

然后,我们可以通过给第一项linkFinderBlock开始处理(或broadcastBlock,如果你也想将其发送到storeBlock):

linkFinderBlock.Post(firstItem); 

最后等到处理完毕:

try 
{ 
    linkFinderBlock.Completion.Wait(); 
} 
catch (AggregateException ex) 
{ 
    if (!(ex.InnerException is TaskCanceledException)) 
     throw; 
} 
+0

哇!谢谢你的精彩解释。你能确认一件事吗?如果我们将MaxDegreeOfParallelism设置为大于1的数字,这是否意味着我需要将集合类型更改为类似于ConcurrentStack的东西,因为它是线程安全的? – 2012-02-15 08:59:58

+0

你的意思是'storeBlock'中的集合?你在哪里设置MaxDegreeOfParallelism?如果你将'storeBlock'的'MDOP'设置为> 1,那么你需要在那里使用一些线程安全的集合(或使用锁)。但是,如果将其他块的“MDOP”设置为> 1,则不会影响“storeBlock”的并行性,因此您不需要考虑线程安全性。 – svick 2012-02-15 11:54:36

+0

这个男孩会让我升级到2012! +1 – toddmo 2013-12-14 03:29:14