2017-08-30 47 views
0

我正在尝试编写Akka流图。我写的代码是将流的输出提供给Akka Streams图中的广播

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder => 
    (sink1, sink2) => 
     import GraphDSL.Implicits._ 
     val bcast = builder.add(Broadcast[Row](2)) 
     val flow = source ~> flow1 ~> flow2 
     flow.out ~> bcast.in 
     bcast.out(0) ~> sink1 
     bcast.out(1) ~> flow3 ~> flow4 ~> sink2 
     ClosedShape 
}) 

val (f1, f2) = graph.run() 
val consolidated = Future.sequence(List(f1, f2)) 
Await.result(consolidated, Duration.Inf) 

此代码无法编译,因为我无法将流出流连接到bcast中。

我可以连接到bcast的源头,但我不能这样做,因为某些部分在两个分支之间是共同的。所以我必须在flow2之后创建图中的分支

另外...我不确定是否正确地编写了图形,因为它返回了两个未完成的未来,我需要将它们手动合并为一个单独的未来使用序列。

回答

1

由于~>组合器不会给你一个流,所以你不能连线你的图表。它实际上是一种有状态的声明式操作。

这里的一个更好的方法是一次连线你的图形,

source ~> flow1 ~> flow2 ~> bcast 
           bcast   ~>   sink1 
           bcast ~> flow3 ~> flow4 ~> sink2 

或者,您也可以通过向构建器添加一个舞台(并检索其形状)来拆分声明,例如,

val flow2s = builder.add(flow2) 

    source ~> flow1 ~> flow2s.in 
    flow2s.out ~> bcast 
       bcast   ~>   sink1 
       bcast ~> flow3 ~> flow4 ~> sink2 

关于物化Future S,你需要选择什么是有意义,因为你的图作为一个整体的物化价值。如果您只需要其中一个物理Future s,则只需将其中一个传递给GraphDSL.create方法。 如果否则您对Future s都感兴趣,那么将sequencezip合在一起非常合适。