2017-02-27 117 views
3

的夫妇后,我有一对夫妇的运行小时后挂起的HTTP连接池:阿卡HTTP连接池挂起时间

private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = { 
    val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host) 
    Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew) 
     .via(pool).toMat(Sink.foreach { 
     case ((Success(res), p)) => p.success(res) 
     case ((Failure(e), p)) => p.failure(e) 
     })(Keep.left).run 
    } 

我排队的项目:

private def enqueue(uri: Uri): Future[HttpResponse] = { 
    val promise = Promise[HttpResponse] 
    val request = HttpRequest(uri = uri) -> promise 

    queue.offer(request).flatMap { 
     case Enqueued => promise.future 
     case _ => Future.failed(ConnectionPoolDroppedRequest) 
    } 
} 

和解决这样的响应:

private def request(uri: Uri): Future[HttpResponse] = { 
    def retry = { 
     Thread.sleep(config.dispatcherRetryInterval) 
     logger.info(s"retrying") 
     request(uri) 
    } 

    logger.info("req-start") 
    for { 
     response <- enqueue(uri) 

     _ = logger.info("req-end") 

     finalResponse <- response.status match { 
     case TooManyRequests => retry 
     case OK => Future.successful(response) 
     case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString)) 
     } 
    } yield finalResponse 
} 

如果未来成功,该函数的结果将始终转换:

def get(uri: Uri): Future[Try[JValue]] = { 
    for { 
    response <- request(uri) 
    json <- Unmarshal(response.entity).to[Try[JValue]] 
    } yield json 
} 

一切都正常工作了一段时间,然后我在日志中看到的所有内容都是req-start和no req-end。

我阿卡的配置是这样的:

akka { 
    actor.deployment.default { 
    dispatcher = "my-dispatcher" 
    } 
} 

my-dispatcher { 
    type = Dispatcher 
    executor = "fork-join-executor" 

    fork-join-executor { 
    parallelism-min = 256 
    parallelism-factor = 128.0 
    parallelism-max = 1024 
    } 
} 

akka.http { 
    host-connection-pool { 
    max-connections = 512 
    max-retries = 5 
    max-open-requests = 16384 
    pipelining-limit = 1 
    } 
} 

我不知道这是否是一个配置问题或一个代码问题。我有我的并行和连接数量如此之高,因为没有它,我得到很低的请求率(我想尽可能请求 - 我有其他速率限制代码来保护服务器)。

回答

3

您没有使用从服务器返回的响应实体。引用以下文档:

消费(或放弃)请求的实体是强制性的!如果 意外地没有被使用或丢弃Akka HTTP将假定 传入的数据应该保持反压,并且将通过TCP反压机制阻塞传入数据。无论HttpResponse的状态如何,客户端应该使用实体( )。

该实体的形式为Source[ByteString, _]需要运行以避免资源匮乏。

如果您不需要读取实体,以消耗实体最简单的方法字节丢弃它们,通过使用

res.discardEntityBytes() 

(您可以通过添加附加回调 - 例如 - .future().map(...)) 。

This page in the docs描述了所有这些替代方法,包括如何在需要时读取字节。

---编辑

经过代码/信息被提供的,很显然,资源消耗是没有问题的。在这个实现中还有另一个大红旗,即重试方法中的Thread.sleep。 这是一个阻止调用,很可能会让您的底层actor系统的线程基础架构挨饿。

docs中提供了为什么这是危险的完整解释。

尝试更改并使用akka.pattern.afterdocs)。示例如下:

def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri)) 
+0

我实际上在获得响应后会消耗实体。我会用更多信息更新帖子。 – asuna

+0

只是将我的代码更改为使用akka.pattern.after,如果问题再次出现,将发布更新。 虽然我确实介绍了较早的Thread.sleep代码,并且分析器显示没有任何线程在停止工作时正在休眠。每当我得到429时,jvisualvm显示其中一个线程正在休眠约500ms,然后该线程再次开始运行,所以我有点怀疑使用调度程序将修复它。不过,使用Thread.sleep非常糟糕 - 感谢给我一个很好的解决方案来解决这个问题。 – asuna

+0

我遇到了同样的问题。这是线程转储:https://gist.github.com/pradyuman/bf83a8f3a293d8c679fcb6dc5f566a80 – asuna