2017-07-19 106 views
10

关于Java9中新的Flow相关接口,我来到了article。从那里示例代码:流程编程:订阅者和发布者跟踪计数?

public class MySubscriber<T> implements Subscriber<T> { 
    private Subscription subscription;  
    @Override 
    public void onSubscribe(Subscription subscription) { 
    this.subscription = subscription; 
    subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded 
    }   
    @Override 
    public void onNext(T item) { 
    System.out.println("Got : " + item); 
    subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded 
    } 

正如你所看到的,onNext()请求一个新项目推。

现在我想知道:

  • 如果onSubscribe()要求,说5个项目
  • 并且第一个项目被交付后,request(1)被称为像上面

目前预计到服务器发送

  • 5项(5请求-1发送+1必要相关捐资)
  • 或一个项目(因为以前的请求被通过新的请求“丢弃”)

换句话说:当request()叫了几次,做这些数字加起来;或之前的请求是否被“丢弃”?

通向问题标题 - 用户是否需要跟踪收到的项目,以避免在某个时候请求“太多”项目。

+6

似乎将增加:[_Adds给定的数字'N'项目,以目前的** ** unfulfille为d这subscription_需求(http://download.java.net/java/jdk9/docs/ api/java/util/concurrent/Flow.Subscription.html#request-long-) –

+0

@SotiriosDelimanolis我认为你应该写更多的答案,单独评论真的很有帮助。在别处表达我的感激之情;-) – GhostCat

回答

6

由于索蒂里奥斯points out,则request method's Javadoc国家(重点煤矿):

添加给定的项目为这个订阅目前未实现的需求数量n。如果n小于或等于零,则订阅服务器将收到带有IllegalArgumentException参数的onError信号。否则,订户将收到最多n附加上的下一个调用(或更少,如果终止)。

因此,答案显然是肯定的,用户需要跟踪项目的。实际上,这就是机制的全部重点。一些背景:request方法意在允许订户应用backpressure,通知上游组件它已经过载并且“需要中断”。因此,订户的(和只有它的)任务仔细审查何时以及要求多少新项目。在这一行中,它不能“重新考虑”并减少要接收的项目数量。

降低数量也将使在这个意义上,完全请求的项目数量可能会突然降低(因为它代表它只能增加),发布者和订阅“非单调”之间的通信。这不仅在抽象意义上令人讨厌,而且带来了具体的一致性问题:当订户突然将请求的项目数量减少到1时,发布商可能正在交付几件商品 - 现在呢?

3

虽然Java 9没有实现Reactive Streams API,但它提供了几乎相同的API并定义了相应的行为。

而在Reactive Streams specification这个相加由以下定义:

1.1对于出版者:

onNext信号的由发布者发送到订户的总数必须小于或等于该订户在任何时间所请求的元素总数。

以及用于3.8认购:

虽然认购没有被取消,Subscription.request(长N)必须注册的附加元件给定数量的待产生的相应的用户。

所以Java坚持那是无流API中给出的规范。

相关问题