2017-11-17 200 views
1

一些大写金额我想测试像这样的情况后subsrcibe它:如何合并2个独立的数据流,从他们缓冲填充的数据和时间

我有2类新刚刚从同一个延伸。

我创建,并从项目的每个班级列表观测量:

val listSomeClass1 = ArrayList<SomeClass1>() 
val listSomeClass2 = ArrayList<SomeClass2>() 

fun populateJust1() { 
    listSomeClass1.add(SomeClass1("23", 23)) 
    listSomeClass1.add(SomeClass1("24", 24)) 
    listSomeClass1.add(SomeClass1("25", 25)) 
} 

fun populateJust2() { 
    listSomeClass2.add(SomeClass2(23.00)) 
    listSomeClass2.add(SomeClass2(24.00)) 
    listSomeClass2.add(SomeClass2(25.00)) 
} 

populateItemsSomeClass1() 
populateItemsSomeClass2() 

现在我可以创建2个观测值:

val someClass1Observable = Observable.fromIterable(listSomeClass1) 
    val someClass2Observable = Observable.fromIterable(listSomeClass2) 

而在这里,我想从合并排放他们,缓冲它,并在10秒后订阅它:

 Observable.merge(someClass1Observable, someClass2Observable) 
      .buffer(10, TimeUnit.SECONDS) 
      .doOnSubscribe { Log.v("parentObservable", "STARTED") } 
      .subscribe { t: MutableList<Parent> -> 
       Log.v("parentObservable", "onNext") 
       t.forEach { Log.v("onNext", it.toString()) } 
      } 

但是,可观察是没有像我预期的那样在10秒后开始,并且随着这个数据准备好开始immedietaly。 如何模拟这样的事情,我会收集2个独立的流,10秒后我将能够获得收集的数据

我必须指出,我不想使用任何主题。

UPDATE

我做somehitng这样的:

val list1 = listOf(SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3)) 
    val list2 = listOf(SomeClass2(5.00), SomeClass2(4.00), SomeClass2(6.00)) 

    val someClass1Observable = Observable 
      .fromIterable(list1) 
      .zipWith(Observable.interval(2, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass1, _: Long -> item }) 



    val someClass2Observable = Observable 
      .fromIterable(list2) 
      .zipWith(Observable.interval(1, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass2, _: Long -> item }) 



    someClass1Observable.subscribe { 
       Log.v("someClass1", it.toString()) 
      } 

    someClass2Observable.subscribe { 
       Log.v("someClass2", it.toString()) 
      } 


    Observable.merge(someClass1Observable, someClass2Observable) 
      .buffer(10, TimeUnit.SECONDS) 
      .delay(10, TimeUnit.SECONDS) 
      .doOnSubscribe { Log.v("parentObservable", "STARTED") } 
      .subscribe { t: MutableList<Parent> -> 
       Log.v("parentObservable", "onNext") 
       t.forEach { Log.v("onNext", it.toString()) } 
      } 

    Thread.sleep(13000) 

    someClass1Observable.subscribe { 
     Log.v("someClass1", it.toString()) 
    } 

    someClass2Observable.subscribe { 
     Log.v("someClass2", it.toString()) 
    } 

在这里,我想只是模拟同为合并可观测someClass1和someclass2观测量和2个无限流。

同样,我想有能力合并这两个流,缓冲区填充数据,并在10秒后做一些事情。如果10秒后这两个流将再次填充一些数据,则合并Observable应该清除先前的缓冲区,并且应该再次缓冲新数据并在10秒后发射等等,无限。然而,我的代码并没有像我期望的那样工作,我需要做些什么改变才能使它像我描述的那样?

回答

1

我认为你正在寻找的delay操作

http://reactivex.io/documentation/operators/delay.html

延迟 的排放量可观察着由特定的移动量在时间

所以是这样的:

.delay(10, TimeUnit.SECONDS) 
+0

感谢您的建议,但它很接近,但不会回答我的问题,请参阅我更新的问题,我做错了什么? – Konrad

相关问题