2017-10-16 81 views
2

我试图在中间应用flatMap之后保持原始顺序。阐述了我的意思了图Rx swift在使用flatMap后保持原始流的原始顺序地图

这里是:

---- 2-4-1 ------------------(原始流)

----------- 1--2 --------- 4--(网络活动 - 由flatMap延迟表示)

------ --------- --------- 2 4-1(那受通缉的结果)

下面是详细情况代码:

persistMessageEventBus.flatMap({ num -> Observable<Int> in 

     print("aaab Doing \(num)") 

     let t2g = Observable.just(num).delay(Double(num), scheduler: MainScheduler.instance).do(onNext:{ num in print("aaab Done async \(num)")}) 

     return t2g 

    }).concatMap({ num -> Observable<Int> in 

     print("aaab Done map \(num)") 

     return Observable.just(num) 

    }).subscribe(onNext: { num in 

     print("aaab done \(num)") 

    }).addDisposableTo(disposeBag) 

    persistMessageEventBus.onNext(2) 
    persistMessageEventBus.onNext(4) 
    persistMessageEventBus.onNext(1) 

输出是:

aaab Doing 2 
aaab Doing 4 
aaab Doing 1 
aaab Done async 1 
aaab Done map 1 
aaab done 1 
aaab Done async 2 
aaab Done map 2 
aaab done 2 
aaab Done async 4 
aaab Done map 4 
aaab done 4 

的通缉的输出是:

aaab Doing 2 
aaab Doing 4 
aaab Doing 1 
aaab Done async 1 
aaab Done async 2 
aaab Done map 2 
aaab done 2 
aaab Done async 4 
aaab Done map 4 
aaab done 4 
aaab Done map 1 
aaab done 1 

是否有类似的东西在RxSwift?

回答

1

改为使用.concatMap(),它保证了原始顺序。

更新#1

那么显然它需要索引和一些缓冲。

 typealias Indexed = (num: Int, index: Int) 

     class Buffer { 
      let ordered = PublishSubject<Int>() 
      private var current = 0 
      private var buffer: [Int: Int] = [:] 
      func onNext(_ indexed: Indexed) { 
       self.buffer[indexed.index] = indexed.num 
       for index in self.buffer.keys.sorted() { 
        if index == current { 
         ordered.onNext(self.buffer[index]!) 
         self.buffer.remove(at: self.buffer.index(forKey: index)!) 
         current += 1 
        } 
       } 
      } 
     } 

     let buffer = Buffer() 

     buffer 
      .ordered 
      .subscribe(onNext: { num in 

       print("aaab done \(num)") 

      }) 
      .disposed(by: disposeBag) 

     persistMessageEventBus 
      .mapWithIndex { (num, index) -> Indexed in 
       return (num: num, index: index) 
      } 
      .flatMap({ indexed -> Observable<Indexed> in 

       print("aaab Doing \(indexed.num)") 

       let t2g = Observable.just(indexed).delay(Double(indexed.num), scheduler: MainScheduler.instance).do(onNext: { indexed in print("aaab Done async \(indexed.num)") }) 

       return t2g 

      }) 
      .subscribe(onNext: { indexed in 
       buffer.onNext(indexed) 
      }) 
      .disposed(by: disposeBag) 

     persistMessageEventBus.onNext(2) 
     persistMessageEventBus.onNext(4) 
     persistMessageEventBus.onNext(1) 
 
aaab Done async 1 
aaab done 2 
aaab Done async 2 
aaab done 4 
aaab Done async 4 
aaab done 1 
+0

是的,但随后的网络活动不会在平行所以不是寻找这样的:--------------- 2 ----- ---- 4-1流将看起来像这样--------------- 2 --------------------- ---- 4-1 – Rotem

+0

@Rotem见** UPDATE#1 ** –

+0

谢谢@ maxim-volgin,这就是我为解决这个问题所做的一切,尽管我提出这个问题的原因是因为我想知道在Rx中是否有本地运营商。无论如何,我会接受这个答案,谢谢,gg! – Rotem

相关问题