0
我有我需要解析字符串的应用程序,我使用它RxScala。我的代码:RxScala缓冲区元素
import java.util.concurrent.TimeUnit
import rx.lang.scala.Subject
import rx.lang.scala.schedulers.NewThreadScheduler
import rx.lang.scala.subjects.{SerializedSubject, PublishSubject}
import scala.concurrent.duration.Duration
object RxScala extends App {
val subject: Subject[String] = SerializedSubject(PublishSubject())
val processLines = (lines: Seq[String]) => {
// long action
}
subject
.subscribeOn(NewThreadScheduler())
.tumblingBuffer(Duration(2, TimeUnit.SECONDS), 100)
.subscribe(processLines)
for(i <- 1 to 100000) {
subject.onNext("Line " + i)
}
}
我有问题,因为我加快行数,然后我可以处理它们。
我想创建缓冲区,例如200行,如果缓冲区已满,忽略缓冲区未满时的新记录,例如
Add 100 records (A)
Add 100 records (B)
Program start processLines (A) // buffer have (B) elements
Add 100 records (C) // buffer have (B, C) elements and it is full
Add 100 records (D) // elements are ignored
ProcessLines is finished
Program start processLines (B) // buffer have (C) elements
Add 100 records (E) // buffer have (C, E) elements
RxScala是否有办法做到这一点?
你试过'onBackpressureDrop'吗? – 2015-03-31 09:15:57