2017-05-03 110 views
1

我开始学习反应流,因为我对使用RxJava替代更传统的事件总线的新趋势感到好奇。 This blog post是如何完成的典型描述。如果我理解正确,RxJava 1.x并不严格执行反应流,但它非常相似。版本2.0包含一些符合要求的类,或者至少通过TCK,因此此代码的更新版本可能会有所不同。使用Reactive Streams Processor作为事件总线通常可以吗?

public class UserLocationModel { 

    private PublishSubject<LatLng> subject = PublishSubject.create(); 

    public void setLocation(LatLng latLng) { 
    subject.onNext(latLng); 
    } 

    public Observable<LatLng> getUserLocation() { 
    return subject; 
    } 
} 

在无流术语,我认为subjectProcessor,这既是一种PublisherSubscriber

的问题是,呼吁未订阅任何一个SubscriberonNext似乎违反了无流规范,特别rule 1.9

这仅仅是一个实现细节,它可以工作吗?我是否正确地认为你通常不能依赖于这个工作来实现兼容的Reactive Streams实现?

回答

4

Subject s和Processor s的标准RxJava 2是宽松的,所以您不必在调用其他方法之前先调用onSubscribe。这部分归因于传统性1.x主题没有onSubscribe,部分原因是RxJava 2处理器不能通过选择协调Subscriber端和Publisher端之间的请求,因此不能用于Subscription

如果您订购RxJava Processor任何符合RS的Publisher,它们似乎会尽可能请求Long.MAX_VALUE和中继信号。如果您订购符合RS标准的Subscriber到RxJava Processor s,他们将承诺那些Subscriber s的背压,并且不会溢出它们,但是,缺少请求可能导致个别发射MissingBackpressureException,并将Subscriber“扔”掉。 extensions library中有一个自定义的Publisher,用于协调请求。

我正确地认为你一般不能依靠这个工作来完成兼容的Reactive Streams实现。

中没有任何的规范,因此在TCK测试不应该用Processor未收到onSubscribe电话会怎么样但它需要它,因此,我认为这已经成为一个实现细节。

这里有两个更大的问题:

  1. 受试者发明与反应世界弥合势在必行世界和GUI案件和非backpressured案件事件的multicasters很好地工作。在无功 - 无功多播中,它们是更好更直接的选择,例如publish(Function)
  2. 在事件公交中思考是一个倒退,因为您通过在单个“铁路”上铲除和排除事件来创建单个堵塞点。相比之下,反应性的设计有利于个人和通常独立的流,其中每个流可以根据需要在线程之间跳转,并且可能在最后时刻避免主线程。