2017-08-21 21 views

回答

3

这可以帮助,您可以使用Source.combine方法加入S1和S2

http://doc.akka.io/docs/akka/current/scala/stream/stages-overview.html#combine

http://doc.akka.io/docs/akka/current/scala/stream/stream-graphs.html#combining-sources-and-sinks-with-simplified-api

val future = Source.combine(s1,s2)(Merge(_)) 
.groupBy(Integer.MAX_VALUE,_._1).map(t => (t._1,List(t._2))) 
.reduce((v1,v2)=>(v1._1, v1._2 ::: v2._2)).mergeSubstreams 
.runWith(Sink.seq) 

future.onComplete{ 
    case Success(s) => ... 
    case Failure(f) => ... 
} 

如果您需要更复杂的,涉及更多阶段的东西,Graphs可以提供帮助。