2015-10-14 100 views
4

我有这个粗略的测试示例与akka-http客户端和服务器。akka-http发送连续分块http响应(流)

Server.scala:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.HttpMethods._ 
import akka.http.scaladsl.model._ 
import scala.concurrent.Future 

class Server extends Runnable { 

    def run() = { 

     implicit val system = ActorSystem("server") 
     implicit val materializer = ActorMaterializer() 

     val serverSource = Http().bind(interface = "localhost", port = 8200) 

     val requestHandler: HttpRequest => HttpResponse = { 
      case HttpRequest(GET, Uri.Path("/stream"), _, _, _) => 
       HttpResponse(entity = HttpEntity(MediaTypes.`text/plain`, "test")) 
     } 

     val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection => 
      connection handleWithSyncHandler requestHandler 
     }).run() 

    } 

} 

Client.scala:

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{Uri, HttpRequest} 
import akka.stream.ActorMaterializer 

object Client extends App { 

    implicit val system = ActorSystem("client") 
    import system.dispatcher 

    new Thread(new Server).start() 

    implicit val materializer = ActorMaterializer() 
    val source = Uri("http://localhost:8200/stream") 
    val finished = Http().singleRequest(HttpRequest(uri = source)).flatMap { response => 
     response.entity.dataBytes.runForeach { chunk => 
      println(chunk.utf8String) 
     } 
    } 

} 

目前Server只是一个 “测试” 的答复。

如何更改Server中的HttpResponse以每隔1秒的无限循环发送“测试”为分块(流)?

回答

4

找到了答案。

Server.scala:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Source, Sink} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.HttpMethods._ 
import akka.http.scaladsl.model._ 
import scala.concurrent.Future 
import scala.concurrent.duration._ 

class Server extends Runnable { 

    def run() = { 

     implicit val system = ActorSystem("server") 
     implicit val materializer = ActorMaterializer() 

     val serverSource = Http().bind(interface = "localhost", port = 8200) 

     val requestHandler: HttpRequest => HttpResponse = { 
      case HttpRequest(GET, Uri.Path("/stream"), _, _, _) => 
       HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, Source(0 seconds, 1 seconds, "test"))) 
     } 

     val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection => 
      connection handleWithSyncHandler requestHandler 
     }).run() 

    } 

}