2017-02-24 68 views
2

如何从Akka HTTP路由发送元素/消息到Akka接收器?我的HTTP路由仍然需要返回一个正常的HTTP响应。akka-http:从http路由发送元素到akka接收器

我想这需要一个流分支/结。正常的HTTP路由是来自HttpRequest - > HttpResponse的流。我想添加一个分支/联结,以便HttpRequests可以触发事件到我单独的接收器以及生成正常的HttpResponse。

下面是一个非常简单的单路akka-http应用程序。为了简单起见,我使用了一个简单的println接收器。我的生产用例显然会涉及一个不太平凡的下沉。

def main(args: Array[String]): Unit = { 
    implicit val actorSystem = ActorSystem("my-akka-http-test") 
    val executor = actorSystem.dispatcher 
    implicit val materializer = ActorMaterializer()(actorSystem) 

    // I would like to send elements to this sink in response to HTTP GET operations. 
    val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

    val route: akka.http.scaladsl.server.Route = 
    path("hello"/Segment) { p => 
     get { 
     // I'd like to send a message to an Akka Sink as well as return an HTTP response. 
     complete { 
      s"<h1>Say hello to akka-http. p=$p</h1>" 
     } 
     } 
    } 

    val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem) 
    val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080) 

    println("Server online at http://localhost:8080/") 
    println("Press RETURN to stop...") 
    scala.io.StdIn.readLine() 

    bindingFuture 
    .flatMap(_.unbind())(executor) // trigger unbinding from the port 
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done 
} 

编辑:或者在使用低级别阿卡-HTTP API,我怎么能发送特定消息到沉从特定路由处理?

def main(args: Array[String]): Unit = { 
    implicit val actorSystem = ActorSystem("my-akka-http-test") 
    val executor = actorSystem.dispatcher 
    implicit val materializer = ActorMaterializer()(actorSystem) 

    // I would like to send elements to this sink in response to HTTP GET operations. 
    val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

    val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(GET, Uri.Path("/"), _, _, _) => 
     HttpResponse(entity = HttpEntity(
     ContentTypes.`text/html(UTF-8)`, 
     "<html><body>Hello world!</body></html>")) 

    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => 
     HttpResponse(entity = "PONG!") 

    case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => 
     sys.error("BOOM!") 

    case r: HttpRequest => 
     r.discardEntityBytes() // important to drain incoming HTTP Entity stream 
     HttpResponse(404, entity = "Unknown resource!") 
    } 

    val serverSource = Http().bind(interface = "localhost", port = 8080) 

    val bindingFuture: Future[Http.ServerBinding] = 
    serverSource.to(Sink.foreach { connection => 
     println("Accepted new connection from " + connection.remoteAddress) 

     connection handleWithSyncHandler requestHandler 
     // this is equivalent to 
     // connection handleWith { Flow[HttpRequest] map requestHandler } 
    }).run() 

    println("Server online at http://localhost:8080/") 
    println("Press RETURN to stop...") 
    scala.io.StdIn.readLine() 

    bindingFuture 
    .flatMap(_.unbind())(executor) // trigger unbinding from the port 
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done 
} 

回答

2

如果你想整个HttpRequest发送给你的一个水槽,我想说的最简单的方法是使用alsoTo组合子。其结果将是沿着

​​

仅供参考线的东西:alsoTo其实隐藏着Broadcast阶段。

IF相反,您需要选择性地将消息从特定的子路由发送到接收器,您别无选择,只能为每个传入请求实现新的流。见下面的例子

val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

val route: akka.http.scaladsl.server.Route = 
    path("hello"/Segment) { p => 
    get { 

     (extract(_.request) & extractMaterializer) { (req, mat) ⇒ 
     Source.single(req).runWith(sink)(mat) 

     complete { 
      s"<h1>Say hello to akka-http. p=$p</h1>" 
     } 
     } 
    } 
    } 

而且,记住,你总是可以完全抛弃高层DSL,并使用lower-level streams DSL模型,你整条路线。这将导致更详细的代码 - 但会给你完全控制你的流实现。

编辑:例如低于

val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

val handlerFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val partition = b.add(Partition[HttpRequest](2, { 
     case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0 
     case _          ⇒ 1 
    })) 
    val merge = b.add(Merge[HttpResponse](2)) 

    val happyPath = Flow[HttpRequest].map{ req ⇒ 
     HttpResponse(entity = HttpEntity(
     ContentTypes.`text/html(UTF-8)`, 
     "<html><body>Hello world!</body></html>")) 
    }   

    val unhappyPath = Flow[HttpRequest].map{ 
     case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => 
     HttpResponse(entity = "PONG!") 

     case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => 
     sys.error("BOOM!") 

     case r: HttpRequest => 
     r.discardEntityBytes() // important to drain incoming HTTP Entity stream 
     HttpResponse(404, entity = "Unknown resource!") 
    } 

    partition.out(0).alsoTo(sink) ~> happyPath ~> merge 
    partition.out(1)    ~> unhappyPath ~> merge 

    FlowShape(partition.in, merge.out) 
    }) 

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080) 
+0

您的建议添加将整个HttpRequest发送到接收器,我想只是一个特定的路由发送特定的消息到接收器。 – clay

+0

gotcha,我添加了更多信息 –

+0

谢谢!你能告诉我怎么用低级别的API来做到这一点吗? – clay

0

这是我用似乎理想的解决方案。阿卡Http似乎是这样设计的,以便您的路由很简单HttpRequest-> HttpResponse流并且不涉及任何额外的分支。

与其将所有内容都构建到单个Akka流图中,我有一个单独的QueueSource-> Sink图,正常的Akka Http HttpRequest-> HttpResponse流只是根据需要向源队列添加元素。

object HttpWithSinkTest { 
    def buildQueueSourceGraph(): RunnableGraph[(SourceQueueWithComplete[String], Future[Done])] = { 
    val annotateMessage: Flow[String, String, NotUsed] = Flow.fromFunction[String, String](s => s"got message from queue: $s") 

    val sourceQueue = Source.queue[String](100, OverflowStrategy.dropNew) 
    val sink: Sink[String, Future[Done]] = Sink.foreach(println) 
    val annotatedSink = annotateMessage.toMat(sink)(Keep.right) 
    val queueGraph = sourceQueue.toMat(annotatedSink)(Keep.both) 

    queueGraph 
    } 

    def buildHttpFlow(queue: SourceQueueWithComplete[String], 
        actorSystem: ActorSystem, materializer: ActorMaterializer): Flow[HttpRequest, HttpResponse, NotUsed] = { 
    implicit val actorSystemI = actorSystem 
    implicit val materializerI = materializer 

    val route: akka.http.scaladsl.server.Route = 
     path("hello"/Segment) { p => 
     get { 
      complete { 
      queue.offer(s"got http event p=$p") 

      s"<h1>Say hello to akka-http. p=$p</h1>" 
      } 
     } 
     } 

    val routeFlow = RouteResult.route2HandlerFlow(route) 

    routeFlow 
    } 

    def main(args: Array[String]): Unit = { 
    val actorSystem = ActorSystem("my-akka-http-test") 
    val executor = actorSystem.dispatcher 
    implicit val materializer = ActorMaterializer()(actorSystem) 

    val (queue, _) = buildQueueSourceGraph().run()(materializer) 

    val httpFlow = buildHttpFlow(queue, actorSystem, materializer) 
    val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem) 
    val bindingFuture = httpExt.bindAndHandle(httpFlow, "localhost", 8080) 

    println("Server online at http://localhost:8080/") 
    println("Press RETURN to stop...") 
    scala.io.StdIn.readLine() 

    println("Shutting down...") 

    val serverBinding = Await.result(bindingFuture, Duration.Inf) 
    Await.result(serverBinding.unbind(), Duration.Inf) 
    Await.result(actorSystem.terminate(), Duration.Inf) 

    println("Done. Exiting") 
    } 
}