2011-10-28 33 views
1

我一直在努力学习过去几天的F#,并且一直困扰着我。我的“学习项目”是我对操作感兴趣的一些数据的屏幕刮板。响应流昂贵的异步读取

在F#PowerPack中有一个调用Stream.AsyncReadToEnd。我不想仅仅为了这个单独的电话而使用PowerPack,所以我看看他们是如何做到的。

module Downloader = 
    open System 
    open System.IO 
    open System.Net 
    open System.Collections 

    type public BulkDownload(uriList : IEnumerable) = 
     member this.UriList with get() = uriList 

     member this.ParalellDownload() = 
      let Download (uri : Uri) = async { 
       let UnblockViaNewThread f = async { 
        do! Async.SwitchToNewThread() 
        let res = f() 
        do! Async.SwitchToThreadPool() 
        return res } 

       let request = HttpWebRequest.Create(uri) 
       let! response = request.AsyncGetResponse() 
       use responseStream = response.GetResponseStream() 
       use reader = new StreamReader(responseStream) 
       let! contents = UnblockViaNewThread (fun() -> reader.ReadToEnd()) 
       return uri, contents.ToString().Length } 

      this.UriList 
      |> Seq.cast 
      |> Seq.map Download 
      |> Async.Parallel 
      |> Async.RunSynchronously 

他们有那个函数UnblockViaNewThread。这真的是异步读取响应流的唯一方法吗?是不是创建一个新的线程真的很昂贵(我已经看到了在整个地方引发的“1mb内存”)。有一个更好的方法吗?这是每个Async*电话(我可以let!)中发生的情况吗?

编辑:我遵循托马斯的建议,实际上想出了独立于F#PowerTools的东西。这里是。这确实需要错误处理,但它会异步请求并将url下载到字节数组。

namespace Downloader 
open System 
open System.IO 
open System.Net 
open System.Collections 

type public BulkDownload(uriList : IEnumerable) = 
    member this.UriList with get() = uriList 

    member this.ParalellDownload() =     
     let Download (uri : Uri) = async { 
      let processStreamAsync (stream : Stream) = async { 
       let outputStream = new MemoryStream() 
       let buffer = Array.zeroCreate<byte> 0x1000 
       let completed = ref false 
       while not (!completed) do 
        let! bytesRead = stream.AsyncRead(buffer, 0, 0x1000) 
        if bytesRead = 0 then 
         completed := true 
        else 
         outputStream.Write(buffer, 0, bytesRead) 
       stream.Close() 
       return outputStream.ToArray() } 

      let request = HttpWebRequest.Create(uri) 
      let! response = request.AsyncGetResponse() 
      use responseStream = response.GetResponseStream() 
      let! contents = processStreamAsync responseStream 
      return uri, contents.Length } 

     this.UriList 
     |> Seq.cast 
     |> Seq.map Download 
     |> Async.Parallel 
     |> Async.RunSynchronously 

    override this.ToString() = String.Join(", ", this.UriList) 

回答

9

我认为AsyncReadToEnd是一个单独的线程只是同步调用ReadToEnd是错误的。

F#PowerPack还包含一个AsyncStreamReader类型,其中包含适当的流读取异步实现。它有一个ReadLine方法(异步)返回下一行,并且只从源流下载几个块(使用异步ReadAsync而不是在后台线程上运行)。

let processStreamAsync stream = async { 
    use asyncReader = new AsyncStreamReader(stream) 
    let completed = ref false 
    while not (!completed) do 
    // Asynchrnously get the next line 
    let! nextLine = asyncReader.ReadLine() 
    if nextLine = null then completed := true 
    else 
     (* process the next line *) } 

如果你想下载的全部内容作为字符串(而不是处理它行由行),那么你可以使用的AsyncStreamReaderReadToEnd方法。这是一个正确的异步实现,它开始下载数据块(异步)并重复此操作而不会阻塞。

async { 
    use asyncReader = new AsyncStreamReader(stream) 
    return! asyncReader.ReadToEnd() } 

此外,F#PowerPack的是开放式souorce并具有许可认证,所以使用它的最好办法就是经常把刚才复制你需要到项目的一些文件。

+1

这完全回答了我的问题。感谢Tomas。 –