2014-11-14 64 views
6

我有一个可观察的数据来自数据库游标的快速数据流。我正在考虑以每秒x项目的速度节制输出。到目前为止,我一直在使用上的文档描述调用栈阻塞:速率限制可观察到的

observable.map(f -> { 
ratelimiter.acquire(); // configured limiter to only allow 
}); 

这是工作正常,但只是出于好奇有没有更好的方式来处理这个使用背压?

韩国社交协会

+0

你想'延迟'或'throttleFirst(throttleLast)'?如果收到物品太快,后者将丢弃物品。 – zsxwing 2014-11-17 06:39:07

回答

2

你可以尝试使用rx.Observable#onBackpressureBuffer()与自定义的用户将定期要求n项目每结合第二。但是,你将被绑定到硬件一秒钟采样。

注意.subscribeOn().toBlocking()只是使主要方法不立即退出。

public class BackpressureTest { 

    public static void main(final String[] args) { 
    Observable.range(1, 1000) 
     .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it 
     .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second 
     .subscribeOn(Schedulers.computation()) 
     .toBlocking() 
     .subscribe(System.out::println); 
    } 

    private static <T> Observable.Operator<T, T> allowPerSecond(final int n) { 
    return upstream -> periodicallyRequestingSubscriber(upstream, n); 
    } 

    private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) { 
    return new Subscriber<T>() { 

     @Override 
     public void onStart() { 
     request(0); // request 0 so that source stops emitting 
     Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items 
     } 

     @Override 
     public void onCompleted() { 
     upstream.onCompleted(); 
     } 

     @Override 
     public void onError(final Throwable e) { 
     upstream.onError(e); 
     } 

     @Override 
     public void onNext(final T integer) { 
     upstream.onNext(integer); 
     } 
    }; 
    } 
} 
0

@ michalsamek的答案似乎是正确的,尽管背压只适用于Flowables。我已更正了他的订阅者,以便它能够完成要求的内容。

在不同时间爆发时使用它也有一个小问题。

private static <T> FlowableOperator<T, T> allowPerMillis(int millis) { 
    return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis); 
} 


Observable.range(1, 100) 
    .observeOn(Schedulers.io()) 
    .toFlowable(BackpressureStrategy.BUFFER) 
    .compose(Flowable::onBackpressureBuffer) 
    .lift(allowPerMillis(200)) 
    .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value)); 



public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> { 

    private final Subscriber<T> upstream; 

    private final int millis; 

    // If there hasn't been a request for a long time, do not flood 
    private final AtomicBoolean shouldRequest = new AtomicBoolean(true); 

    public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) { 
     this.upstream = upstream; 
     this.millis = millis; 
    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     Observable 
       .interval(millis, TimeUnit.MILLISECONDS) 
       .subscribe(x -> { 
        if (shouldRequest.getAndSet(false)) 
         subscription.request(1); 
       }); 
} 

@Override 
public void onNext(T t) { 
    shouldRequest.set(true); 
    upstream.onNext(t); 
} 

@Override 
public void onError(Throwable throwable) { 
    upstream.onError(throwable); 
} 

@Override 
public void onComplete() { 
    upstream.onComplete(); 
} 
}