2017-10-21 114 views
0

我有一个(费时)计算,类似的东西:如何跳过可观测量的项目,而组合使用RxJS与流逝的定时器处理

async calculation(input: MyInputType): Promise<MyOutputType> { 
    // fetch some infos from an HTTP endpoint 
    // calculate result by combining received infos with input 
    return result; // Promise which resolves after all calculation is complete 
} 

计算所用的输入经由来自另一个部件一个可观察的(BehaviorSubject),它发出的物品有时比他们可以处理的速度快。因此,输出依赖于http端点的可观察信息和获取的信息,这些信息也可能随时发生变化。

重要:只有最后计算结果是我的应用程序相关,必须在任何时间的变量可用。
我的目标是使用RxJS和其反应性运营商设置一个可观察到的链具有以下行为:从所述源BehaviorSubject

  • 物品发射而计算当前正在处理应该被忽略(backpressuring防止DoS)但是...
  • 当前计算完成或发生超时时,如果有新的计算项目,则必须直接为最近的输入项目启动新的计算。
  • 如果在至少5分钟内没有开始计算,则必须再次使用最近的输入项目开始新的计算。 (这对更新即将到来的http信息很重要)

有没有人有任何想法如何解决这个问题?
(我使用的是RxJS 5,TypeScript,Node.js,但也欢迎其他语言的反应式解决方案。)

+0

我认为这个问题相当接近你想要达到的目标。 https://stackoverflow.com/q/46785128/482868 –

+1

如果计算时间超过5分钟,会发生什么情况?是否还有新的计算开始,如果有的话,旧计算会发生什么? – concat

+0

好问题,但是由于我在第二个项目符号中提到的超时,所以不应该发生这种情况。这个超时(计算本身)当然少于5分钟。因此在这种情况下可以放弃结果。 – Niehno

回答

0

从我的立场来看,有两个单独的问题。首先是如何在5分钟内计算已经计算的物品。我们首先填充空间大于5分钟的克隆重复的最后一个值。克隆将在稍后重要。

// inputs$: Observable<MyInputType> 
const spacefilled_inputs$ = inputs$.switchMap(input => Observable.interval(5000).map(() => Object.assign({}, input))) 
            .merge(input$); 

第二个问题是棘手:如何触发calculation两个先前计算(如果有一个)完成后,当一个新的输入从spacefilled_inputs$发射时?公平的警告:RxJS自然不会这么做。我先放下了一些代码,并解释后:

const promise_subject = new Subject<Promise<MyOutputType>>(); 
const results$ = promise_subject.asObservable() 
           .flatMap(promise => Observable.fromPromise(promise)); 
Observable.combineLatest(spacefilled_inputs$, results$) 
      .pairwise() 
      .scan(([inputs_changed, results_changed], [prev_pair, next_pair]) => { 
      // assumes that both MyInputType and MyOutputType are non-primitive objects 
      inputs_changed = inputs_changed || prev_pair[0] !== next_pair[0]; 
      results_changed = results_changed || prev_pair[1] !== next_pair[1]; 

      if(inputs_changed && results_changed) { 
       promise_subject.next(calculation(next_pair[1])); 
       return [false, false]; 
      } 
      return [inputs_changed, results_changed]; 
      }, [true, true]); 

最重要的,我们将通过受流的计算结果,使他们能信号进一步计算。这里的关键是pairwise,它发出前一个和后来的元素的帧,所以我们可以比较它们。如果输入和结果都发生了变化,则scan的主体会比较并保持状态。

注意检查更改对对象引用使用严格的等式。如果MyOutputTypeMyInputType是原语,则会错误地筛选出重复值。你需要做一些类似于输入和结果流的计数器。

+0

非常感谢您的回复! 第一部分:我很喜欢这个解决方案。我只是想知道为什么'Object.assign'是必要的。 (我的输入对象可能很大)。调用mapTo是不够的吗? (输入=> Observable.interval(5000).mapTo(输入))。合并(输入$)' 第二部分:我不得不承认,我还没有理解你的代码100% 。解决方案似乎对我来说比较复杂。当我更好地理解你的代码时,我会再次报告。第2部分更容易的解决方案是受欢迎的。 – Niehno

+0

@Niehno如果'scan'检测到上述变化,那么如果没有克隆,计算将不会每5分钟重试一次,因为它会得到相同的对象。我同意,这不是最高性能的解决方案,因此您可以将计数器与两个流相关联,但不幸的是以第2部分的复杂性为代价。 – concat