2016-11-23 80 views
6

说,我有两个来源:一个人如何控制的阿卡流的基础上另一个流流

val ticks = Source(1 to 10) 
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable]) 

我想创建一个基于当前值在阿卡流的Graph[...]处理步骤ticks数据流尽可能在数据流中消耗。因此,例如,当值匹配我想返回所有匹配的第二源的元素,否则保持滴答导致类似的输出:

(1, None) 
(2, None) 
(3, Some(Seq(3))) 
(4, Some(Seq(4, 4))) 
(5, None) 
(6, None) 
(7, Some(Seq(7))) 
(8, Some(Seq(8,8,8,8))) 
(9, Some(Seq(9))) 
(10, None) 

你将如何实现这一行为?

回答

1

我建议你看看关于这个问题的阿卡流文档:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

根据该网站,就可以实现这样的GraphStage:

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] { 

val in = Inlet[E]("AccumulateWhileUnchanged.in") 
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out") 

override def shape = FlowShape(in, out) 
} 

还有一个博客文章在这个问题上:http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

希望这会有所帮助:)

+0

您能否给出具体细节?你的答案只是表明可以写一个自定义的阶段,它不提供解决给定问题的阶段...... –

相关问题