3
我想用Akka HTTP建立一个REST服务,连接到现有的宿(使用Kafka反应流),但我无法弄清楚如何将HTTP流链接到Akka流接收器。将Akka HTTP连接到Akka流
我应该选择使用Flow的低级Akka HTTP API吗?
我的要求是有:
- 背压的完整流程
- 200响应代码时,所有事情都是由卡夫卡承认下沉
- 500时,背压过高?可能吗 ?
这里是我的代码当前代码
// flow to split group of lines into lines
val splitLines = Flow[String].mapConcat(_.split("\n").toList)
// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String](topic, _))
.toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)
val routes = {
path("ingest") {
post {
logger.info("starting ingestion")
entity(as[GenericEvent]) { eventIngest =>
????
}~
entity(as[GenericEventList]) { eventIngestList =>
????
}
}
}
}
Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)
是否有任何方式解除与实体流(作为[事件])? – vgkowski
是的,你可以随时对卡夫卡水槽进行解组活动。我在回答中加入了另一个例子 –
这是Akka HTTP流程中的一个新流程,不会引入一些无用的开销? – vgkowski