2017-02-23 77 views
1

这是一个非常简单的,使用GraphDSL API的新手问题。我读了几个相关的SO线程和我没有看到答案:Akka Streams:如何从GraphDSL API获取物化接收器输出?

val actorSystem = ActorSystem("QuickStart") 
val executor = actorSystem.dispatcher 
val materializer = ActorMaterializer()(actorSystem) 

val source: Source[Int, NotUsed] = Source(1 to 5) 
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping) 
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2) 
val sink = Sink.foreach(println) 

val graphModel = GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    // I presume I want to change this shape to something else 
    // but I can't figure out what it is. 
    ClosedShape 
} 
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the 
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape 
// but I can't get that working. 
val graph = RunnableGraph.fromGraph(graphModel) 

// This works and gives me the materialized sink output using the simpler API. 
// But I want to use the GraphDSL so that I can add branches or junctures. 
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right) 

回答

4

诀窍是通过你想要物化值(在你的情况下,sink)到GraphDSL.create的阶段。您作为第二个参数传递的函数也会发生变化,需要在您的图形中使用Shape输入参数(下例中为s)。

val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> s 

    // ClosedShape is just fine - it is always the shape of a RunnableGraph 
    ClosedShape 
    } 
    val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel) 

更多信息可在docs找到。

+0

我upvoted你的文档参考;-) –

+1

ahh殴打到终点;)打得好 –

+0

感谢你们俩。当我将接收器添加到GraphDSL.create(sink)调用时,ClosedShape会收到编译器错误。我如何更新? – clay

3
val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    ClosedShape 
} 
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)  
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right) 

与GraphDSL API的问题是,隐式生成器负荷过重。您需要将接收器包装在create中,这会将Builder[NotUsed]变成Builder[Future[Done]],并且现在代表builder => sink => shape而不是builder => shape的功能。

+0

谢谢。当我将sink参数添加到'GraphDSL.create'时,'ClosedShape'行会得到一个新的编译器错误。任何想法如何更新? – clay

+0

对不起,在另一个答案上回答:-) –