2016-11-08 57 views
0

我希望创建一个返回observable<Location>它,我可以发送一个新的位置和用户获取最后一个加入任何后续值LocationHandler类。RX的Java 2,可观察到接受新值添加

我写这个类,它的工作原理,但我不知道这是否是这样做,因为我已经添加了一个回调,我闻到不好的正确方法。

感谢您的任何帮助。

public class LocationHandler { 
    private MessageHandler<Location> onNewItem; 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = getHookedObservable() 
       .mergeWith(locationInitBuilder.build()) 
       .replay(1).autoConnect(); 
    } 


    private Observable<Location> getHookedObservable() { 
     return Observable.create(new ObservableOnSubscribe<Location>() { 
      @Override 
      public void subscribe(ObservableEmitter<Location> e) throws Exception { 
       onNewItem = location -> e.onNext(location); 
      } 
     }); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ // <---------- add new values 
     if (onNewItem != null){ 
      onNewItem.handleMessage(address); 
     } else { 
      throw new IllegalStateException("Cannot add an item to a never subscribed stream"); 
     } 
    } 
} 

下面我用一个ReplaySubject修改了它@Blackbelt建议。

public class LocationHandler { 
    private ReplaySubject<Location> inputStream = ReplaySubject.create(1); 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = locationInitBuilder.build() 
       .mergeWith(inputStream) 
       .replay(1).autoConnect(); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ 
     inputStream.onNext(address); 
    } 
} 

回答

2

你可以使用的,而不是SubjectMessageHandler。主体可以同时作为可观察用户和订户。您可以在您的LocationHandler中使用返回Subject#asObservable的方法,您将订阅该方法。在内部,当setLocation,你将不得不调用Subject#onNext提供的位置。有不同类型的主题可用。请参阅文档以选择更适合您需求的文档。例如。

public class LocationHandler { 
    BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create(); 

    public Observable<GeevLocation> getLocation() { 
     return mLocationSubject.asObservable(); 
    } 

    public void setLocation(GeevLocation address){ 
     mLocationSubject.onNext(address); 
    } 
} 

从外面呼叫getLocation和订阅返回Observable。当一个setLocation被称为你会得到的对象onNext

1

作为黑带已经告诉你,你会使用一个主题。特别是我会使用BehaviorSubject。主题默认为热门,但他们可以通过订阅重播活动。如果您订阅,BehaviorSubject会为您提供最后发布的值或初始值。每个用户都会得到这些值。流将永远不会完成,因为它很热。请记住处理错误,因为第二个onError将被吞噬。

例码

class Location { 

} 

class LocationInitializationBuilder { 
    static Location build() { 
     return new Location(); 
    } 
} 

class LocationHandler { 
    private Subject<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     Location initialValue = LocationInitializationBuilder.build(); 

     locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized(); 
    } 

    public Observable<Location> getLocation() { 
     return locationObservable.hide(); 
    } 

    public void setLocation(Location address) { // <---------- add new values 
     locationObservable.onNext(address); 
    } 
} 

public class LocationTest { 
    @Test 
    public void name() throws Exception { 
     LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder()); 

     TestObserver<Location> test = locationHandler.getLocation().test(); 

     locationHandler.setLocation(new Location()); 

     test.assertValueCount(2); 
    } 
} 
+0

其实我不能,因为它是一个流通过我得到LocationInitializationBuilder.build使用行为。行为需要一个我无法在创作时提供的明确价值。 –

+0

是的,我看到问题出在哪里。您的解决方案看起来合法 –

+1

'BehaviourSubject'有一个静态方法'create',它创建一个空的'BehaviourSubject' – Blackbelt