如何从Akka HTTP路由发送元素/消息到Akka接收器?我的HTTP路由仍然需要返回一个正常的HTTP响应。akka-http:从http路由发送元素到akka接收器
我想这需要一个流分支/结。正常的HTTP路由是来自HttpRequest - > HttpResponse的流。我想添加一个分支/联结,以便HttpRequests可以触发事件到我单独的接收器以及生成正常的HttpResponse。
下面是一个非常简单的单路akka-http应用程序。为了简单起见,我使用了一个简单的println接收器。我的生产用例显然会涉及一个不太平凡的下沉。
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val route: akka.http.scaladsl.server.Route =
path("hello"/Segment) { p =>
get {
// I'd like to send a message to an Akka Sink as well as return an HTTP response.
complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
编辑:或者在使用低级别阿卡-HTTP API,我怎么能发送特定消息到沉从特定路由处理?
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
val serverSource = Http().bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler
// this is equivalent to
// connection handleWith { Flow[HttpRequest] map requestHandler }
}).run()
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
您的建议添加将整个HttpRequest发送到接收器,我想只是一个特定的路由发送特定的消息到接收器。 – clay
gotcha,我添加了更多信息 –
谢谢!你能告诉我怎么用低级别的API来做到这一点吗? – clay