2015-03-30 85 views
0

我有我需要解析字符串的应用程序,我使用它RxScala。我的代码:RxScala缓冲区元素

import java.util.concurrent.TimeUnit 
import rx.lang.scala.Subject 
import rx.lang.scala.schedulers.NewThreadScheduler 
import rx.lang.scala.subjects.{SerializedSubject, PublishSubject} 
import scala.concurrent.duration.Duration 

object RxScala extends App { 
    val subject: Subject[String] = SerializedSubject(PublishSubject()) 

    val processLines = (lines: Seq[String]) => { 
    // long action 
    } 

    subject 
    .subscribeOn(NewThreadScheduler()) 
    .tumblingBuffer(Duration(2, TimeUnit.SECONDS), 100) 
    .subscribe(processLines) 

    for(i <- 1 to 100000) { 
    subject.onNext("Line " + i) 
    } 
} 

我有问题,因为我加快行数,然后我可以处理它们。

我想创建缓冲区,例如200行,如果缓冲区已满,忽略缓冲区未满时的新记录,例如

Add 100 records (A) 
Add 100 records (B) 
Program start processLines (A) // buffer have (B) elements 
Add 100 records (C) // buffer have (B, C) elements and it is full 
Add 100 records (D) // elements are ignored 
ProcessLines is finished 
Program start processLines (B) // buffer have (C) elements 
Add 100 records (E) // buffer have (C, E) elements 

RxScala是否有办法做到这一点?

+0

你试过'onBackpressureDrop'吗? – 2015-03-31 09:15:57

回答

0
subject. 
    tumblingBuffer(Duration(2, TimeUnit.SECONDS), 100). 
    onBackpressureDrop. 
    observeOn(NewThreadScheduler()). 
    subscribe(processLines) 

http://reactivex.io/documentation/operators/backpressure.html

onBackpressureDrop 滴从源可观察排放除非有来自下游订户未处理的请求,在这种情况下它将 发出足够的项目以完成该请求