2016-09-15 200 views
2

我有重复元素的列表,说:Rx等同于GROUP BY的COUNT?

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 

我想通过他们与他们出现了多少次沿数值组他们,所以输出会是对的:

{"A", 3}, {"B", 1}, {"C", 2} 

基本上是一个SQL语句像SELECT x, COUNT(1) GROUP BY x;

相当于我只拿到了这么远,呼吁GROUPBY他们:

source.groupBy(x -> x, x -> 1) 

但是,这将流转换为GroupedObservables,我无法找到一个很好的示例如何与他们继续前进。我尝试了reduce(),但这里不好,因为在groupBy()之后,它希望减少分组的观察对象,而不是每个组内的元素。

这是可能与GroupedObservables?有没有其他方法可以达到预期效果?

+1

你知道吗,如果你有一个热点观测值或没有一个'OnComplete'信号,你不能做这个计算? – Enigmativity

+0

不,我没有想到,但似乎可以理解,谢谢。实际上,我列出了整个列表,而不是我的例子中的元素(我简化了问题的代码),所以我猜这在这里不是问题,对吧? –

+0

是的,但如果它是一个冷的可观测值,它会发出所有的值,您可能不需要Rx。你想要做什么? – Enigmativity

回答

8

以下代码:

source.groupBy(val -> val) 
    .flatMap(
     gr -> gr.count() 
       .map(count -> new Pair<>(gr.getKey(), count) 
    ) 
).subscribe(System.out::println); 

将打印出:

A=3 
B=1 
C=2 
+0

真高雅,谢谢! –

1
Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 
    Observable<KeyValue<String, Integer>> countStream = source 
      .groupBy(val -> val) 
      .flatMap(obs -> obs.count().flatMap(cnt -> Observable.just(new KeyValue<>(obs.getKey(), cnt)))); 

    private static class KeyValue<K, V> { 

    private final K key; 
    private final V val; 

    public KeyValue(K key, V val) { 
     this.key = key; 
     this.val = val; 
    } 

    public K getKey() { 
     return key; 
    } 

    public V getVal() { 
     return val; 
    } 
} 
1

的另一种方法是使用collectcollectInto方法,如下所示。

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 

source.collectInto(new HashMap<String, MutableInt>(), (map, elem) -> { 
    if (map.containsKey(elem)) { 
    map.get(elem).increment(); 
    } else { 
    map.put(elem, new MutableInt(1)); 
    } 

}).subscribe(System.out::println); 

顺便说一句,如果我们使用Reactorcollect是这样做的提示方式,因为在大量组的情况下,GROUPBY后flatMap将挂起。

javadoc of Reactor

注意GROUPBY效果最好组的低基数,所以相应地选择你的keyMapper功能。
...
值得注意的是,当条件产生大量的组时,如果组不适合下游使用(例如由于设置了过低的maxConcurrency参数的flatMap),则可能导致挂起。

还有一个与此相关的github issue