2016-11-17 88 views
0

我有api从服务器下载单个mp3文件,这是使用RxJava作为波纹管消耗的。RxAndroid下载多个文件,最多3个并发线程

Observable<ResponseBody> observable = audioService.getFile(fileNameWithExtension); 
     observable.subscribeOn(Schedulers.newThread()) 
       .observeOn(Schedulers.newThread()) 
       .subscribe(someCallBackClass<ResponseBody>); 

这只是下载单个文件,回调将文件保存在磁盘上。 我想下载文件列表,将每个文件保存在磁盘上,并等待所有下载完成,最多3个调用应该并行执行。 如何用RXAndroid做到这一点,我尝试了flatmap,但我无法完全理解它。

编辑新代码

List<Observable<Response<ResponseBody>>> audioFiles = new ArrayList<>(); 

    for (String fileNameWithExtension : fileNamesWithExtension) { 
     Observable<Response<ResponseBody>> observable = restFactory.getAudioService().getFile(fileNameWithExtension); 
     audioFiles.add(observable); 
    } 

    Observable.from(audioFiles).flatMap(audioFile -> Observable.fromCallable(() -> { 
     audioFile.subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .toBlocking() 
       .subscribe(new CallBackWithErrorHandling<>(Downloader.this)); 
     return 0; 
    }).subscribeOn(Schedulers.io()), MAX_CONCURRENT) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Subscriber<Integer>() { 
       @Override 
       public void onCompleted() { 
        goToMainActivity(); 
       } 

       @Override 
       public void onError(Throwable e) { 
        Log.e(TAG, "Something went wrong , " + Thread.currentThread().getName()); 
        Log.e(TAG, "Something went wrong , " + e.toString()); 
        showToast(R.string.something_went_wrong); 
        goToMainActivity(); 
       } 

       @Override 
       public void onNext(Integer integer) { 
       } 
      }); 

,这是工作的罚款,但是当网络出现故障或互联网连接速度较慢我得到

java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare() 

我无法了解哪些线正好需要observeOn() android主线程。

回答

2

您可以flatMap实现这一目标,限制了它的并发性,还需要在后台调度,做文件传输内可观测运行:

fileNames 
.flatMap(name -> { 
     return Observable.fromCallable(() -> { 
      // put your blocking download code here, save the data 
      return name; // return what you need down below 
     }) 
     .subscribeOn(Schedulers.io()); 
}, 3) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(completedFile -> { }, error -> { }, 
    () -> { /* all completed.*/ }); 

编辑:

既然你使用Observable API进行网络下载,您无需阻止:

Observable.from(audioFiles) 
.flatMap(audioFile -> 
    audioFile.subscribeOn(Schedulers.io()), // <-- apply extra transforms here 
    MAX_CONCURRENT) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(completedFile -> { }, error -> { }, 
    () -> { /* all completed.*/ }) 

目前尚不清楚尽管你用CallBackWithErrorHandling做了什么。