0
我试图使用akka-http为了向单个主机发送http请求(例如“akka.io”)。问题在于创建的流(Http()。cachedHostConnectionPool)仅在发出N个http请求后才开始发送响应,其中N等于max-connections。为什么akka http不会针对前N个请求发出响应?
import scala.util.Failure
import scala.util.Success
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object ConnectionPoolExample extends App {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val config = ConfigFactory.load()
val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10)
lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings)
val fakeSource = Source.fromIterator[Unit] {() => Iterator.continually { Thread.sleep(1000);() } }
val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) }
val responses = requests.via(poolClientFlow)
responses.runForeach {
case (tryResponse, jsonData) =>
tryResponse match {
case Success(httpResponse) =>
httpResponse.entity.dataBytes.runWith(Sink.ignore)
println(s"status: ${httpResponse.status}")
case Failure(e) => {
println(e)
}
}
}
}
输出看起来是这样的:
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
status: 200 OK
Creating request
status: 200 OK
Creating request
status: 200 OK
...
我不能尽快找到任何配置参数,这将允许发射的反应,因为他们已经准备好,而不是当池超出免费连接。
谢谢!
谢谢罗兰。这个特殊的例子通过使用Source.tick来解决。在这个fakeSource中使用Thread.sleep(1000)是不幸的。真正的来源是从卡夫卡读取,它通过扩展GraphStage [SourceShape [A]] ... 'val stream = consumerMap.getOrElse(topicName,List())来实现。重写DEF onPull():单位= {。 VAL jsonData = JsonParser(stream.head.message())的ConvertTo [A] 推(下,jsonData) } })' ... 是它也阻断客户端? – uladzimir
是的,您需要在该源上添加'.async'以将其与其余流相分离。我们也在研究适当的卡夫卡整合,参见反应卡夫卡。 –