2016-11-09 66 views
0

在RxJava 1/RxScala中,如何在下列情况下节流/背压可观测源?基于资源稀缺可观测到的背压

def fast: Observable[Foo] // Supports backpressure 

def afterExpensiveOp: Observable[Bar] = 
    fast.flatMap(foo => Observable.from(expensiveOp(foo)) 

// Signature and behavior is out of my control 
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = { 
    if(noResources()) Future.failed(new OutOfResourcesException()) 
    else Future { Bar() } 
} 

一个可能的解决方案是阻止直到。其工作,但,这是非常不雅,防止多个同时请求:

def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => 
    Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head) 
) 
+1

可能你必须为此编写自己的操作符。在RxScala中,一个运算符是一个函数'Subscriber [T] => Subscriber [R]',您可以使用'lift'将它应用到'Observable'。在某些时候,您创建的Subscriber [R]将不得不检查是否有可用的资源,如果是,请调用其继承的'request'方法以从'fast'可观察值中获取更多项。 –

回答

0

flatMap有一个参数来限制并发用户的数量。如果你使用这个flatMap来照顾你的背压。

def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))