2017-03-02 83 views
4

我有一个无限的元素流,我想通过ID和聚合组进行分组,让我们说2秒,然后将它们发送到下游。这是不工作的代码,但可以更好地解释什么,我想:阿卡流。通过聚合一段时间,并发出结果

Source 
    .tick(0 second, 50 millis,() => if (Random.nextBoolean) (1, s"A") else (2, s"B")) 
    .map { f => f() } 
    .groupBy(10, _._1) 
    // how to aggregate grouped elements here for two seconds? 
    .scan(Seq[String]()) { (x, y) => x ++ Seq(y._2) } 
    .to(Sink.foreach(println)) 

和期望的输出应该是这样的:

Seq(A, A, A, A, A) 
Seq(B, B, B) 
Seq(A, A) 
Seq(B, B, B, B, B) 
// and so on 

我怎样才能实现与流这样的功能?

回答