2017-08-02 92 views
2

我正在Scala编写应用程序,我正在使用Akka流。Akka流 - 按流中元素的数量进行过滤

在某一点上,我需要过滤掉N个元素少于N个的流。因此,例如,与N=5

Source(List(1,2,3)).via(myFilter)  // => List() 
Source(List(1,2,3,4)).via(myFilter)  // => List() 

将成为空流,

Source(List(1,2,3,4,5)).via(myFilter) // => List(1,2,3,4,5) 
Source(List(1,2,3,4,5,6)).via(myFilter) // => List(1,2,3,4,5,6) 

将保持不变。

当然,我们无法知道流中元素的数量,直到它结束为止,并且在推送之前等到最后才可能不是最好的主意。

所以,相反,我已经考虑过下面的算法:

  1. 用于第一N-1的元素,只是缓冲它们,而无需进一步传递;
  2. 如果输入流在到达第N个元素之前完成,则输出空流;
  3. 如果输入流到达第N个元素,输出缓冲的N-1个元素,然后输出第N个元素,然后传递所有后面的元素。

但是,我不知道如何构建一个实现它的Flow元素。是否有一些可以使用的内置Akka元素?

编辑:

好了,我打了昨天,我想出了这样的事情:

Flow[Int]. 
    prefixAndTail(N). 
    flatMapConcat { 
    case (prefix, tail) if prefix.length == N => 
     Source(prefix).concat(tail) 
    case _ => 
     Source.empty[Int] 
    } 

将它做我想做什么?

回答

1

这可能是其中一个小“状态”可以走很长的路。即使解决方案不是“纯粹的功能”,更新状态将被系统的其他部分隔离和无法访问。我认为这是斯卡拉的美丽之一:当FP解决方案不明显时,您可以始终以孤立的方式回复命令......

完成的Flow将是多个子部分的组合。第一流只会组的元素融入尺寸N的序列:

现在的非功能部分,一个过滤器,只能通过当第一顺序是正确的尺寸允许分组Seq值:

val minSizeRequirement : Int => Seq[Int] => Boolean = 
    (minSize) => { 
    var isFirst : Boolean = True 

    var passedMinSize : Boolean = False 

    (testSeq) => { 
     if(isFirst) { 
     isFirst = False 
     passedMinSize = testSeq.size >= minSize 
     passedMinSize 
     } 
     else 
     passedMinSize 
     } 
    } 
    } 

val minSizeFilter : Int => Flow[Seq[Int], Seq[Int], _] = 
    (minSize) => Flow[Seq[Int]].filter(minSizeRequirement(minSize)) 

的最后一步是将Seq[Int]值转换回Int值:

val flatten = Flow[Seq[Int]].flatMapConcat(l => Source(l)) 

最后,结合它们放在一起:

val combinedFlow : Int => Flow[Int, Int, _] = 
    (minSize) => 
    group(minSize) 
     .via(minSizeFilter(minSize)) 
     .via(flatten) 
0

也许statefulMapConcat可以帮助你:

import akka.actor.ActorSystem 
import akka.stream.scaladsl.{Sink, Source} 
import akka.stream.{ActorMaterializer, Materializer} 

import scala.collection.mutable.ListBuffer 
import scala.concurrent.ExecutionContext 

object StatefulMapConcatExample extends App { 

    implicit val system: ActorSystem = ActorSystem() 
    implicit val materializer: Materializer = ActorMaterializer() 
    implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global 

    def filterLessThen(threshold: Int): (Int) => List[Int] = { 
    var buffering = true 
    val buffer: ListBuffer[Int] = ListBuffer() 
    (elem: Int) => 
     if (buffering) { 
     buffer += elem 
     if (buffer.size < threshold) { 
      Nil 
     } else { 
      buffering = false 
      buffer.toList 
     } 
     } else { 
     List(elem) 
     } 
    } 

    //Nil 
    Source(List(1, 2, 3)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Nil 
    Source(List(1, 2, 3, 4)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Vector(1,2,3,4,5) 
    Source(List(1, 2, 3, 4, 5)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Vector(1,2,3,4,5,6) 
    Source(List(1, 2, 3, 4, 5, 6)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 
}