2015-09-17 39 views
3

我使用了Android RX java和我的事件总线类妥善处理的onError我EventBus是如下如何使用RX的Java

public class EventBus { 
private final Subject<Event, Event> subject = new SerializedSubject<>(PublishSubject.<Event>create()); 
private Observable<Map<Type, Event>> stickyObservable; 

public EventBus() { 
    createStickyObservable(); 
} 

private void createStickyObservable() { 
    final List<Observable<Event>> observables = new ArrayList<>(); 

    final Observable<Map<Type, Event>> so = subject 
      .filter(event -> event.sticky) 
      .groupBy(event -> event.type) 
      .switchMap(groupedObservable -> { 
       BehaviorSubject<Event> bs = BehaviorSubject.create(); 
       groupedObservable.subscribe(bs); 
       observables.add(bs); 
       return Observable.combineLatest(observables, args -> { 
        Map<Type, Event> map = new HashMap<>(); 
        for (Object arg : args) { 
         Event event = (Event) arg; 
         map.put(event.type, event); 
        } 

        return map; 
       }); 
      }); 

    final BehaviorSubject<Map<Type, Event>> bs = BehaviorSubject.create(); 
    so.subscribe(bs); 
    stickyObservable = bs; 
} 

public Observable<Event> filter(final String pathExpression) { 
    final Pattern pattern = Pattern.compile(pathExpression); 

    return subject.filter(event -> { 
     if (event.path == null) { 
      return pathExpression == null; 
     } 
     return pattern.matcher(event.path).matches(); 
    }); 
} 

public Observable<Map<Type, Event>> getStickyObservable() { 
    return stickyObservable; 
} 


public void event(Event event) { 
    subject.onNext(event); 
} 

}

我得到了很多错误日志与rx.exceptions.OnErrorNotImplementedException:

我该如何解决这个问题?请建议我解决此问题

回答

3

当您订阅getStickyObservable()时,您需要实施onError方法(不要仅使用例如.subscribe(action)过载)。

+0

谢谢戴夫:)它为我工作 – Renjith