0
我正在尝试编写Akka流图。我写的代码是将流的输出提供给Akka Streams图中的广播
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(sink1, sink2) =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Row](2))
val flow = source ~> flow1 ~> flow2
flow.out ~> bcast.in
bcast.out(0) ~> sink1
bcast.out(1) ~> flow3 ~> flow4 ~> sink2
ClosedShape
})
val (f1, f2) = graph.run()
val consolidated = Future.sequence(List(f1, f2))
Await.result(consolidated, Duration.Inf)
此代码无法编译,因为我无法将流出流连接到bcast中。
我可以连接到bcast的源头,但我不能这样做,因为某些部分在两个分支之间是共同的。所以我必须在flow2之后创建图中的分支
另外...我不确定是否正确地编写了图形,因为它返回了两个未完成的未来,我需要将它们手动合并为一个单独的未来使用序列。