2014-11-02 74 views
2

帮助撰写多个网络电话并在Rxjava中累积结果。 (我正在使用的Android应用程序。)撰写多个网络电话RxJava - Android

State 
-- List<City> cityList; 

City 
- cityId; 

RestCall 1 
Observable<State> stateRequest = restService.getStates(); 

RestCall 2 
Observable<CityDetail> cityRequest = restService.getCityDetail(cityId); 

在UI我有一种让每个城市的所有细节后显示的城市列表,然后显示在列表视图。 我如何实现parllel网络调用并累积结果。 ?

我想把所有的城市细节结果放在源状态“对象”的列表中。由于国家对象也有一些信息需要进行分析。这是可能的吗?

stateRequest ??? 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(new Subscriber<State>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(State result) { 
    // Get city list and display 
    } 
}); 

我检查了这个例子,它显示了我们如何能更多地拉一个可观察的响应。下面的代码片段显示了3个可观察组合。 但在我的情况下,我不得不做20个网络电话并行或顺序(我的意思是在后台,但一个接一个)。我如何做到这一点。任何帮助或指示?

https://gist.github.com/skehlet/9418379

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() { 
    @Override 
    public Map<String, String> call(String s, Integer integer, Integer integer2) { 
     Map<String, String> map = new HashMap<String, String>(); 
     map.put("f3", s); 
     map.put("f4", String.valueOf(integer)); 
     map.put("f5", String.valueOf(integer2)); 
     return map; 
    } 

回答

3

我觉得你的代码可以简化为这样的事情,作为您使用zip运营商的接近使用toList运营商的

stateRequest 
.subscribe(State state -> { 
    Observable.from(state.getCityList()) 
       .flatMap(City city -> restService.getCityDetail(city.getId()) 
       .toList() 
       .subscribe(List<City> cities -> { 

        state.clear(); 
        state.addAll(cities); 
       }); 
    }); 

由于RxJava不提供throttle运营商,你可能会构建类似如下的东西:

Observable<City> limiter = Observable.zip(Observable.interval(1, SECONDS), aCity, (i, c) -> c); 

使用此限制器是一种可观察的事物,它将每秒发射一座城市。

所以,用你的代码,如果你想呼叫限制getCityDetail例如:

Observable<Object> limiter = Observable.interval(1, SECONDS); 
stateRequest 
.subscribe(State state -> { 
    Observable.zip(limiter, Observable.from(state.getCityList()), (i, c) -> c) 
       .flatMap(City city -> restService.getCityDetail(city.getId()) 
       .toList() 
       .subscribe(List<City> cities -> { 

        state.clear(); 
        state.addAll(cities); 
       }); 
    }); 
+0

间隔邮编是一个很好的方法。为我工作。接受你的答案。 – Mani 2014-11-03 13:48:37

1
stateRequest 
.flatMap(new Func1<State, Observable<State>>() { 
    @Override 
    public Observable<State> call(final State state) { 

     List<Observable> cityObservablesList = new ArrayList<Observable>(); 

     for(City city: state.getCityList()) { 
      cityObservablesList.add(restService.getCityDetail(city.getId()); 
     } 

     Observable cityObservables = Observable.from(cityObservablesList); 
     return Observables.zip(cityObservables, new FuncN<State>() { 
      @Override 
      public State call(Object... args) { 
       List<City> cityList = state.getCityList(); 
       cityList.clear(); 
       for(Object object: args) { 
        cityList.add((City)object); 
       } 

       return state; 
      } 
     }) 
    }) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(new Subscriber<State>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(State result) { 
    // Get city list and display 
    } 
}); 

我把它带拉链运营商和可迭代的帮助城市列表作为第一个参数工作。 但我面临另一个问题。由于zip以并行方式执行作业,所以10-15个网络调用并行执行,服务器以最大每秒查询错误(QPS-403)拒绝。 我如何指导zip运算符一个接一个地执行任务?

我确实通过向城市可观测值添加延迟[delay(c * 200,TimeUnit.MILLISECONDS))]来解决此问题。但似乎不是一个合适的解决方案。

有没有建议吗?

+0

我的建议是要问这是一个新问题。不过,'zip'不应该并行执行这些项目--Rx具有行为保证。我会看看你在'for'循环中对'getCityDetail'的调用。这是什么原因造成了这个问题? – Enigmativity 2014-11-02 23:00:15

0

看看flatMap(功能..,双功能..)。也许这就是你需要的。

statesRepository.getStates() 
    .flatMap(states-> Observable.fromIterable(states)) 
    .flatMap(
      state-> cityRepository.getStateCities(state), 
      (state, cityList) -> { 
       state.setCities(cityList); 
       return state; 
      }) 
    .subscribe(state-> showStateWithCity(state));