我正在Scala编写应用程序,我正在使用Akka流。Akka流 - 按流中元素的数量进行过滤
在某一点上,我需要过滤掉N个元素少于N个的流。因此,例如,与N=5
:
Source(List(1,2,3)).via(myFilter) // => List()
Source(List(1,2,3,4)).via(myFilter) // => List()
将成为空流,
Source(List(1,2,3,4,5)).via(myFilter) // => List(1,2,3,4,5)
Source(List(1,2,3,4,5,6)).via(myFilter) // => List(1,2,3,4,5,6)
将保持不变。
当然,我们无法知道流中元素的数量,直到它结束为止,并且在推送之前等到最后才可能不是最好的主意。
所以,相反,我已经考虑过下面的算法:
- 用于第一N-1的元素,只是缓冲它们,而无需进一步传递;
- 如果输入流在到达第N个元素之前完成,则输出空流;
- 如果输入流到达第N个元素,输出缓冲的N-1个元素,然后输出第N个元素,然后传递所有后面的元素。
但是,我不知道如何构建一个实现它的Flow
元素。是否有一些可以使用的内置Akka元素?
编辑:
好了,我打了昨天,我想出了这样的事情:
Flow[Int].
prefixAndTail(N).
flatMapConcat {
case (prefix, tail) if prefix.length == N =>
Source(prefix).concat(tail)
case _ =>
Source.empty[Int]
}
将它做我想做什么?