2016-09-16 63 views
3

将HttpEntity转换为字符串后的延迟我使用akka-http具有相当短的响应时间目标的Web应用程序。我使用路由DSL completeWithFuture方法与一个CompletableFuture的链。什么导致通过toStrict()

我注意到,在使用CompletionStage方法的XXXasync变体并传递相同的执行程序来链接每个将来时,用于处理阶段的线程可以任意更改,导致对于某些请求的响应时间更长,以防所有线程使用指定的执行程序。因此,我将自定义执行程序传递给第一个CompletableFuture,并将所有后续阶段与正常变体链接起来,以便为它们使用相同的线程。

问题:一阶段通过HttpEntity.toStrict()确实的HttpEntity为String的转化率和该方法使用从akka.actor.default-dispatcher一个线程。随着工作量的增加,越来越多的请求会在下一阶段开始时超过所需的响应时间,尽管将超时时间设置为toStrict的方式低于目标响应时间,并且没有超时异常。

简化的代码

private Route handleRequest(final HttpRequest request) { 
    return completeWithFuture(CompletableFuture.runAsync(() -> preprocessing(), systemDispatcher) // Dispatcher 1 
      .thenCompose((preprocessingResult) -> // please ignore that preprocessingResult is not used in that simplified version 
      entityToString(request.entity()).thenApply((requestString) -> generateResponse(requestString)))); 
} 

public CompletionStage<String> entityToString(final HttpEntity entity) { 
     long start = System.nanoTime(); 
     return entity.toStrict(bodyReadTimeoutMillis, materializer).thenApplyAsync((final Strict strict) -> { 
      System.out.println(start-System.nanoTime()); // varies between <1ms and >500ms 
      return strict.getData().utf8String(); 
     }, systemDispatcher); // Dispatcher 2 
    } 

所以我的猜测是,从我的自定义执行的线程akkas默认的演员调度线程之一和背面的开关引起的问题。

问题:对于我的entityToString方法中的延迟是否有另一种解释?是否有一种方法可以实现与toStrict相同的操作,即将整个可能的分块消息体作为字符串获取,同时避免多次切换线程?

请注意,我需要toStrict方法的超时功能来中止处理缓慢的POST请求。

UPDATE

它在过去几天的思考,我认为这是不可能实现无阻塞的阅读,阿卡担保,无需切换线程。所以真正的问题是在toStrict之后的调度可能导致明显的高延迟。

我试图用不同的调度程序(见注释分派器1/分派器2在上面的代码)和记录的居民在延迟超过50ms的情况下计数分派器2的。我找不到合适的文档,但我认为这是计划任务的数量。 我运行了10000个请求,200个并发连接的Apache平台,并获得55次超过50ms的延迟。输出显示最多有80位居民。我在Amazons m3.2xlarge实例(8个vCPU,30GB RAM,Ubuntu 16.04)上运行了这个测试,没有其他进程消耗任何明显数量的cpu)。调度程序的类型为fork-join-executor,并行度因子= 1。

具有更多可变数量并发请求的实际流量会导致请求超出限制(高达50%)的速率不断增加。

处理请求的平均时间低于1ms。toStrict后如何避免频繁的延迟以及如何避免它?

+0

(1)当toStrict将数据收集到一个ByteString中时,它可能需要等待来自网络的数据。所以,这将取决于客户端实际发送完整请求需要多长时间。 (2)如果在8个内核上运行200个并发请求,则意味着每个请求只能获得一小部分CPU时间,因此总体执行时间至少会增加该因子。 – jrudolph

+0

在大多数现实世界中,如果您不控制客户端,则必须应用更高的超时,因为访问您站点的客户端发送数据可能会很慢。在任何情况下,如果您可以以流方式处理传入数据会更好,因此1)您不需要使用toStrict 2)在处理它之前不需要缓存所有传入数据。 – jrudolph

+0

在我目前的情况下,我在实时拍卖中出价,并且回复在50ms后无效。不幸的是,有时候我收到了不完整的http POST bodys。等待遗漏的部分必须中止,以便为请求处理释放资源。我尝试使用流处理传入数据,但无法弄清楚1)如何用流回答http请求2)在50ms后停止处理请求3)在遇到太多请求时应用tcp反压。该请求包含一个复杂的JSON对象,需要在可以处理之前进行解析 – teano

回答

1

您可以尝试使用akka-streams获取实体内容,并以不同的方式实现超时要求而不是实体读取。

entity.getDataBytes().runFold(ByteString.empty(), ByteString::concat, materializer) 
.thenCompose(r -> r.utf8String()); 
+0

我试过了,但流也使用'akka.actor.default-dispatcher'。我认为'thenCompose'应该是'thenApply'。无论如何,它总是使用另一个线程池中的线程。由于在你的代码中没有指定执行程序,它使用来自'ForkJoinPool.commonPool'的线程。我也不知道如何停止流并关闭连接,比如'toStrict',如果我可以以某种方式实现超时。最后,我认为entity.getDataBytes()不会收集消息体的所有可能的部分,如'toStrict'文档中所述。 – teano

+0

您是否尝试使用配置为使用自定义调度程序的actor的上下文创建实现程序? – Tiago

+0

同样的结果。在最后几天考虑它,我相信不可能实现无阻塞读取,即akka保证,不切换线程。所以真正的问题是'toStrict'之后的调度带来的明显的高延迟。 – teano

相关问题