2016-11-12 55 views
2

RxJava中的背压并不是真正的反压,而只是忽略了一些元素。RxJava中REAL背压的最佳实施

但是如果我不能释放任何元素而且我需要以某种方式减慢情绪?

RxJava不会影响元素意志,所以开发者需要自己实现它。但是如何?

想到最简单的方法就是使用一些计数器,在完成时递增和递减。

就像是:

public static void sleep(int ms) { 
    try { 
     Thread.sleep(ms); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

public static void main(String[] args) throws InterruptedException { 

    AtomicInteger counter = new AtomicInteger(); 

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1)); 
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5)); 

    Observable.create(s -> { 
     while (!s.isUnsubscribed()) { 
      if (counter.get() < 100) { 
       s.onNext(Math.random()); 
       counter.incrementAndGet(); 
      } else { 
       sleep(100); 
      } 
     } 
    }).subscribeOn(sA) 
      .flatMap(r -> 
        Observable.just(r) 
          .subscribeOn(sB) 
          .doOnNext(x -> sleep(1000)) 
          .doOnNext(x -> counter.decrementAndGet()) 
      ) 
      .subscribe(); 
} 

但我觉得这样很可怜。有更好的解决方案吗?

回答

-1

正如你自己提到的,这实际上与RxJava无关。
如果你最终必须处理所有的事件,但要做到这一点,在自己的节奏,用队列:

ExecutorService emiter = Executors.newSingleThreadExecutor(); 
    ScheduledExecutorService workers = Executors.newScheduledThreadPool(4); 
    BlockingQueue<String> events = new LinkedBlockingQueue<>(); 


    emiter.submit(() -> { 
     System.out.println("I'll send 100 events as fast as I can"); 

     for (int i = 0; i < 100; i++) { 
      try { 
       events.put(UUID.randomUUID().toString()); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    workers.scheduleWithFixedDelay(
      () -> { 
       String result = null; 
       try { 
        result = events.take(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 

       System.out.println(String.format("I don't care, got %s only now", result)); 
      }, 0, 1, TimeUnit.SECONDS 
    ); 
+1

队列没有被反压。背压点是向生产者发出信号以减缓排放。这可能不总是可能的,但在绝大多数情况下,这是可能的。这就是为什么在JDK9的新反应流规范中显式反压。 https://github.com/reactive-streams/reactive-streams-jvm – kaqqao

2

好了,背压RxJava是不是真实的背压

RxJava的背压通过请求通道实现后续生产者和消费者之间的非阻塞合作。消费者通过request()请求一定数量的元素,并且制作者通过onNext创建/生成/发出至多该项目的数量,有时延迟在onNext之间。

但只忽略了一些元素集。

只有当您明确告诉RxJava删除任何溢出时,才会发生这种情况。

RxJava不会影响元素意志,所以开发者需要自己实现它。但是如何?

使用Observable.create需要有关如何实施非阻塞反压的实用知识,实际上不建议图书馆用户使用。 RxJava有很多方法可以让你启用背压,流无并发症:

Observable.range(1, 100) 
.map(v -> Math.random()) 
.subscribeOn(sA) 
.flatMap(v -> 
    Observable.just(v).subscribeOn(sB) 
    .doOnNext(x -> sleep(1000)) 
) 
.subscribe(); 

Observable.create(SyncOnSubscribe.createStateless(
    o -> o.onNext(Math.random()) 
) 
.subscribeOn(sA) 
... 
+0

我读过有关背压的话题。我正在寻求减慢我的例子的元素选择的方式。假设我正在通过光标从数据库中读取记录,并且我需要放慢它,导致系统进程记录速度较慢,然后生成器发出它们。 – corvax

+0

这是基于拉动的,并且拉动速度与下游可以消耗的速度一样快。 – akarnokd