0
在RxJava 1/RxScala中,如何在下列情况下节流/背压可观测源?基于资源稀缺可观测到的背压
def fast: Observable[Foo] // Supports backpressure
def afterExpensiveOp: Observable[Bar] =
fast.flatMap(foo => Observable.from(expensiveOp(foo))
// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
if(noResources()) Future.failed(new OutOfResourcesException())
else Future { Bar() }
}
一个可能的解决方案是阻止直到。其工作,但,这是非常不雅,防止多个同时请求:
def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo =>
Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)
可能你必须为此编写自己的操作符。在RxScala中,一个运算符是一个函数'Subscriber [T] => Subscriber [R]',您可以使用'lift'将它应用到'Observable'。在某些时候,您创建的Subscriber [R]将不得不检查是否有可用的资源,如果是,请调用其继承的'request'方法以从'fast'可观察值中获取更多项。 –