RxJava中的背压并不是真正的反压,而只是忽略了一些元素。RxJava中REAL背压的最佳实施
但是如果我不能释放任何元素而且我需要以某种方式减慢情绪?
RxJava不会影响元素意志,所以开发者需要自己实现它。但是如何?
想到最简单的方法就是使用一些计数器,在完成时递增和递减。
就像是:
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (!s.isUnsubscribed()) {
if (counter.get() < 100) {
s.onNext(Math.random());
counter.incrementAndGet();
} else {
sleep(100);
}
}
}).subscribeOn(sA)
.flatMap(r ->
Observable.just(r)
.subscribeOn(sB)
.doOnNext(x -> sleep(1000))
.doOnNext(x -> counter.decrementAndGet())
)
.subscribe();
}
但我觉得这样很可怜。有更好的解决方案吗?
队列没有被反压。背压点是向生产者发出信号以减缓排放。这可能不总是可能的,但在绝大多数情况下,这是可能的。这就是为什么在JDK9的新反应流规范中显式反压。 https://github.com/reactive-streams/reactive-streams-jvm – kaqqao