2017-04-12 82 views
0

比方说,我们有方法返回Future[T]或java CompletableFuture[T]或自定义AsyncCompletionHandler[T]org.asynchttpclient。我想节流全部调用这种方法。如何实现对支持异步回调的方法的调用的节流

你会怎么做?目前我使用MergeHub.source为基础的Sink通过它漏斗的所有请求。问题我有

  1. 有没有更好的办法?
  2. 在我的日志输出中,我看到花费在所有请求上的时间少于我的预期。为什么?

下面是代码

import java.time.ZonedDateTime 

import akka.actor.ActorSystem 
import akka.stream.scaladsl.{MergeHub, Sink, Source} 
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ThrottleMode} 
import org.asynchttpclient.{DefaultAsyncHttpClient, _} 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future, Promise} 
import scala.language.postfixOps 
import scala.util.{Failure, Success, Try} 

object Main { 

    private implicit val system = ActorSystem("root") 
    private implicit val executor = system.dispatcher 
    private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) 

    type PendingRequest =() => Future[Try[Response]] 

    private val throttlingSink = 
    MergeHub.source[PendingRequest] 
     .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping) 
     .mapAsync(4)(_.apply()) 
     .to(Sink.ignore) 
     .run() 

    def wrap(p: Promise[Try[Response]]): AsyncCompletionHandler[Response] = new AsyncCompletionHandler[Response] { 
    override def onThrowable(t: Throwable): Unit = 
     p.success(Failure(t)) 

    override def onCompleted(response: Response): Response = { 
     p.success(Success(response)) 
     response 
    } 
    } 

    def makeRequest(url: String): Future[Response] = { 

    val p = Promise[Try[Response]] 

    Source.single[PendingRequest](() => { 
     asyncHttpClient 
     .prepareGet(url) 
     .execute(wrap(p)) 

     p.future 
    }) 
     .runWith(throttlingSink) 

    p.future.flatMap { 
     case Success(r) => Future.successful(r) 
     case Failure(ex) => Future.failed(ex) 
    } 
    } 

    val asyncHttpClient = new DefaultAsyncHttpClient() 

    def main(args: Array[String]): Unit = { 

    val start = ZonedDateTime.now() 
    println("Start!") 
    Source(1 to 20) 
     .mapAsync(4) { index => 
     println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index") 
     makeRequest(s"https://httpbin.org/get?param=$index").map { r => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}") 
     } 
     } 
     .runWith(Sink.ignore) 
     .onComplete { 
     case Success(_) => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!") 
      asyncHttpClient.close() 
      system.terminate() 
     case Failure(ex) => 
      ex.printStackTrace() 
      asyncHttpClient.close() 
      system.terminate() 
     } 

    Await.result(system.whenTerminated, Duration.Inf) 
    } 
} 

换句话说也有多处类似的主要内容。所有这些都应该作为调用的总和来节制。

回答

1

作为一般性评论,您可能可以不使用MergeHub步骤并简化您的管道。见下面的例子

object Main { 

    private implicit val system = ActorSystem("root") 
    private implicit val executor = system.dispatcher 
    private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) 

    def makeRequest(url: String): Future[Response] = { 
    val promise = Promise[Response]() 
    asyncHttpClient.prepareGet(url).execute(new AsyncCompletionHandler[Response] { 
     def onCompleted(response: Response) = { 
     promise.success(response) 
     response 
     } 
     override def onThrowable(t: Throwable) { 
     promise.failure(t) 
     super.onThrowable(t) 
     } 
    }) 
    promise.future 
    } 

    val asyncHttpClient = new DefaultAsyncHttpClient() 

    def main(args: Array[String]): Unit = { 

    val start = ZonedDateTime.now() 
    println("Start!") 
    Source(1 to 20) 
     .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping) 
     .mapAsync(4) { index => 
     println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index") 
     makeRequest(s"http://httpbin.org/get?param=$index").map { r => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}") 
     } 
     } 
     .runWith(Sink.ignore) 
     .onComplete { 
     case Success(_) => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!") 
      asyncHttpClient.close() 
      system.terminate() 
     case Failure(ex) => 
      ex.printStackTrace() 
      asyncHttpClient.close() 
      system.terminate() 
     } 

    Await.result(system.whenTerminated, Duration.Inf) 
    } 
} 

然而,在两种实施方式中我看到请求正确节流 - 一个每2秒,从第二〜0大致开始第二〜38。

您能详细说明您在这里的期望吗?

+0

谢谢你,斯特凡诺! 'MergeHub'的目标是限制所有对REST API的调用。换句话说,像'main'的内容有多个地方。所有这些都应该作为调用的总和来节制。 – expert

+0

好的,但即使使用'MergeHub',我可以看到节流大致起作用,每2秒打一次电话。你期望看到什么? –

+0

它或多或少的作用,但我想知道是否有可能更精确地解决节流。正如你在给定的例子中正确地注意到的那样,它花费了38秒,而不是我预期的40秒。 – expert