2011-09-08 93 views
4

我有一个类“图像”具有三个属性:URL,ID,内容。 我有10个这样的图像列表。 这是一个silverlight应用程序。并行HttpWebRequests与无扩展

我想创建一个方法:

IObservable<Image> DownloadImages(List<Image> imagesToDownload) 
{ 
    //start downloading all images in imagesToDownload 
    //OnImageDownloaded: 
          image.Content = webResponse.Content 
          yield image 

} 

这种方法开始下载并行所有10张图像。 然后,每个下载完成时,它会将Image.Content到下载的WebResponse.Content。

结果应该是一个的IObservable流与每个下载的图像。

我在RX初学者,我觉得我想可以用ForkJoin达到什么样的,但是这是在反应扩展DLL的实验版本,我不想使用。

而且我真的不喜欢下载的回调计数检测,所有图像下载完毕后,然后调用onCompleted()。

似乎并没有被在Rx精神给我。

我也张贴到目前为止,我什么编码的解决方案,虽然我不喜欢我的解决方案,因为它的长/丑,并使用计数器。

 return Observable.Create((IObserver<Attachment> observer) => 
     { 
      int downloadCount = attachmentsToBeDownloaded.Count; 
       foreach (var attachment in attachmentsToBeDownloaded) 
         { 
          Action<Attachment> action = attachmentDDD => 
          this.BeginDownloadAttachment2(attachment).Subscribe(imageDownloadWebResponse => 
           { 
            try 
            { 
             using (Stream stream = imageDownloadWebResponse.GetResponseStream()) 
             { 
              attachment.FileContent = stream.ReadToEnd(); 
             } 
             observer.OnNext(attachmentDDD); 

             lock (downloadCountLocker) 
             { 
              downloadCount--; 
              if (downloadCount == 0) 
              { 
               observer.OnCompleted(); 
              } 
             } 
            } catch (Exception ex) 
            { 
             observer.OnError(ex); 
            } 
           }); 
          action.Invoke(attachment); 
         } 

         return() => { }; //do nothing when subscriber disposes subscription 
        }); 
      } 

好吧,我确实管理它,使它的工作最终根据吉姆的答案。

var obs = from image in attachmentsToBeDownloaded.ToObservable() 
       from webResponse in this.BeginDownloadAttachment2(image).ObserveOn(Scheduler.ThreadPool) 
       from responseStream in Observable.Using(webResponse.GetResponseStream, Observable.Return) 
       let newImage = setAttachmentValue(image, responseStream.ReadToEnd()) 
       select newImage; 

其中setAttachmentValue只需要`image.Content = bytes;返回图像;

BeginDownloadAttachment2代码:

 private IObservable<WebResponse> BeginDownloadAttachment2(Attachment attachment) 
    { 
     Uri requestUri = new Uri(this.DownloadLinkBaseUrl + attachment.Id.ToString(); 
     WebRequest imageDownloadWebRequest = HttpWebRequest.Create(requestUri); 
     IObservable<WebResponse> imageDownloadObservable = Observable.FromAsyncPattern<WebResponse>(imageDownloadWebRequest.BeginGetResponse, imageDownloadWebRequest.EndGetResponse)(); 

     return imageDownloadObservable; 
    } 
+0

很高兴提供帮助。我不得不说,解决方案看起来比开始的维护容易得多。 –

+0

良好的使用。很好的解决方案。 –

回答

3

怎么样,我们简化了这个有点。把你的图像列表,并将其转换为可观察的。接下来,考虑使用Observable.FromAsyncPattern来管理服务请求。最后使用SelectMany将请求与响应进行协调。我正在做一些关于如何在这里获取文件流的假设。本质上,如果您可以将您的服务请求的BeginInvoke/EndInvoke委托传入到FromAsyncPattern中,那您就很好。

var svcObs = Observable.FromAsyncPattern<Stream>(this.BeginDownloadAttachment2, This.EndDownloadAttchment2); 

var obs = from image in imagesToDownload.ToObservable() 
      from responseStream in svcObs(image) 
      .ObserveOnDispatcher() 
      .Do(response => image.FileContent = response.ReadToEnd()) 
      select image; 
return obs; 
+0

谢谢吉姆。它几乎不错。我遇到的唯一问题是BeginDownloadAttachment2返回一个IObservable 。我尝试了代码转换为:... DO(响应=> Observable.Using(response.GetResponseStream,溪流=> image.FileContent = stream.ReadToEnd())),但我得到一些错误:类型参数不能从使用推断。 –

+0

其实我觉得我找到了:... DO(响应=> Observable.Using(response.GetResponseStream,流=> Observable.Return(stream.ReadToEnd()))做(字节=> image.Value。 FileContent = bytes))。将测试并查看它是否有效。 –

+0

没有工作:( –