2017-02-27 99 views
3

我想用Akka HTTP建立一个REST服务,连接到现有的宿(使用Kafka反应流),但我无法弄清楚如何将HTTP流链接到Akka流接收器。将Akka HTTP连接到Akka流

我应该选择使用Flow的低级Akka HTTP API吗?

我的要求是有:

  • 背压的完整流程
  • 200响应代码时,所有事情都是由卡夫卡承认下沉
  • 500时,背压过高?可能吗 ?

这里是我的代码当前代码

// flow to split group of lines into lines 
    val splitLines = Flow[String].mapConcat(_.split("\n").toList) 

// sink to produce kafka records in kafka 
val kafkaSink = Flow[String] 
    .map(new ProducerRecord[Array[Byte], String](topic, _)) 
    .toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right) 

val routes = { 
    path("ingest") { 
     post { 
     logger.info("starting ingestion") 
     entity(as[GenericEvent]) { eventIngest => 
      ????  
     }~ 
     entity(as[GenericEventList]) { eventIngestList => 
      ???? 
     } 
     } 
    } 
    } 

Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort) 

回答

2

还有的要对此有几种方法。一个建议可能是将数据直接从您的请求实体传输到您的kafka接收器中。 extractDataBytes指令可以帮助你做到这一点(更多信息here)。

尝试一下下面的例子。我添加了???流程以允许您的特定于案例的转换来正确拆分/转换您的请求实体字节。您可以使用类似Framing.delimiter来分割实体字节流(更多信息here)。

(extractDataBytes & extractMaterializer) { (byteSrc, mat) => 
    val f = byteSrc.via(???).runWith(kafkaSink)(mat) 
    onComplete(f){ 
     case Success(value) => complete(s"OK") 
     case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
    } 
    } 

另外,如果你想解组的实体一些领域对象,你可以这样做

(entity(as[Event]) & extractMaterializer) { (event, mat) => 
    val f = Source.single(event).via(???).runWith(kafkaSink)(mat) 
    onComplete(f){ 
     case Success(value) => complete(s"OK") 
     case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
    } 
    } 

来你的最后一个问题,应该卡夫卡背压,你流将永远不会完成。您应该会在服务器给你回500所配置的请求超时后(引用下面的文档):

一个默认请求超时全局应用到所有路线和可 配置为使用akka.http .server.request-timeout设置(其中 默认为20秒)。

+0

是否有任何方式解除与实体流(作为[事件])? – vgkowski

+0

是的,你可以随时对卡夫卡水槽进行解组活动。我在回答中加入了另一个例子 –

+0

这是Akka HTTP流程中的一个新流程,不会引入一些无用的开销? – vgkowski