2016-09-26 92 views
0

我有一个响应这些两个URI阿卡流链接在一起,流,每个发出HTTP请求

/posts/{id}   : get a blog post by ID 
/comments?postId={id} : get all the comments on a blog by the blog's id 

与一批博客ID的启动宁静的博客服务器(http://jsonplaceholder.typicode.com),我想创建一个流动的是执行该程序的步骤:

  1. 击中/posts端点得到的JSON后
  2. 反序列化JSON的Blog案例类
  3. 击中/comments端点每个博客和获取的评论的JSON列表为后
  4. 反序列化评论JSON到Comment箱子对象的列表
  5. 请在评论(收集状态或垃圾邮件分析)
  6. 一些处理

是的,我知道如果我有博客ID,我可以直接跳到第3步。假装我不能

我想要一堆HTTP请求去服务器在步骤1中。为了实现这一目标,我使用了cachedHostConnectionPool。以下是我迄今为止:

final case class Blog(id: Int, userId: Int, title: String, body: String) 
final case class Comment(id: Int, postId: Int, name: String, email: String, body: String) 

object AkkaStreamProcessor extends App { 
    implicit val actorSystem = ActorSystem("blogProcessor") 
    import actorSystem.dispatcher 
    implicit val flowMaterializer = ActorMaterializer() 

    private def getBlogUri(id: Integer): String = "/posts/" + id 
    private def getCommentsUri(blog: Blog): String = "/comments?postId=" + blog.id 
    private def parseBlogResponse(jsonResponse: String): Blog = Json.parse(jsonResponse).as[Blog] 
    private def parseCommentsResponse(jsonResponse: String): List[Comment] = Json.parse(jsonResponse).as[List[Comment]] 

    val pooledConnectionFlow = { 
    val connectionSettings = ConnectionPoolSettings(actorSystem) 
     .withMaxConnections(32) 
     .withMaxOpenRequests(32) 
     .withMaxRetries(3) 
    Http().cachedHostConnectionPool[Int](host = "jsonplaceholder.typicode.com", settings = connectionSettings) 
    } 

    val source = Source(1 to 32) 
    val fetchBlogsFlow = Flow[Int] 
    .map((id: Int) => (getBlogUri(id),id)) 
    .map{ case(uri:String, id:Int) => (HttpRequest(method = HttpMethods.GET, uri = uri), id) } 
    .via(pooledConnectionFlow) 
    .map { case(response: Try[HttpResponse], id:Int) => handleBlogResponse(response, id) } 
    .map((jsonText: Try[String]) => jsonText.map(j => parseBlogResponse(j))) 

    val sink = Sink.foreach[Try[Blog]](blog => blog.map(b=> println(b))) 
    source.via(fetchBlogsFlow).runWith(sink) 

    private def handleBlogResponse(response: Try[HttpResponse], id: Int): Try[String] = { 
    println(s"Received response for id $id on thread ${Thread.currentThread().getName}") 
    response.flatMap((r: HttpResponse) => { 
     r.status match { 
     case StatusCodes.OK => { 
      Success(Await.result(Unmarshal(r.entity).to[String], Duration.Inf)) 
     } 
     case _ => Failure(new RuntimeException("Invalid response : " + r.status.toString())) 
     } 
    }) 
    }  
} 

现在,我要的是创造做步骤3和4的另一个流,我会在第一流动后链。但是,我正在为来自第一个流程的讨厌的Try[Blog]输出而挣扎。如何将Try[Blog]管道化为另一个HTTP请求?有没有办法分裂管道,失败是一种方式,而成功则是另一种方式?

以下是我对第二流,但我不知道如何使链接工作,而在调用getTry

val processBlogsFlow = Flow[Try[Blog]] 
    .map((tryBlog: Try[Blog]) => tryBlog.get) 
    .map((blog: Blog) => (HttpRequest(method=HttpMethods.GET, uri=getCommentsUri(blog)), blog.id)) 
    .via(pooledConnectionFlow) 
+1

如果你想放弃失败,你可以使用'Flow.collect'。如果你想处理失败,你可以对流进行分区,例如使用'Flow.groupBy'或'分区'。当请求导致“失败”时会发生什么? – devkat

+0

它将被记录下来,并可能在稍后的批次中重新排队等待重试。 –

回答

1

有一个用于处理Try很好blog entry 。在您的具体的例子,我会保留Try这样你就可以在原来的故障信息:

def blogToTuple(blog : Blog) = 
    (HttpRequest(method=HttpMethods.GET, uri=getCommentsUri(blog)), blog.id) 

val processBlogsFlow : Flow[Try[Blog], Try[HttpResponse], _] = 
    Flow[Try[Blog]] 
    .map(_ map blogToTuple) 
    .mapAsync(1) { _ match { 
     case Success(req) => 
      Source.single(req).via(pooledConnectionFlow).runWith(Sink.head) 
     case ex => Future { x } 
     } 
    } 

现在Try可以传递给你的Sink它可以在任何错误信息报告,以及报告的有效的响应。