0
我有一个响应这些两个URI阿卡流链接在一起,流,每个发出HTTP请求
/posts/{id} : get a blog post by ID
/comments?postId={id} : get all the comments on a blog by the blog's id
与一批博客ID的启动宁静的博客服务器(http://jsonplaceholder.typicode.com),我想创建一个流动的是执行该程序的步骤:
- 击中
/posts
端点得到的JSON后 - 反序列化JSON的
Blog
案例类 - 击中
/comments
端点每个博客和获取的评论的JSON列表为后 - 反序列化评论JSON到
Comment
箱子对象的列表 - 请在评论(收集状态或垃圾邮件分析) 一些处理
是的,我知道如果我有博客ID,我可以直接跳到第3步。假装我不能
我想要一堆HTTP请求去服务器在步骤1中。为了实现这一目标,我使用了cachedHostConnectionPool
。以下是我迄今为止:
final case class Blog(id: Int, userId: Int, title: String, body: String)
final case class Comment(id: Int, postId: Int, name: String, email: String, body: String)
object AkkaStreamProcessor extends App {
implicit val actorSystem = ActorSystem("blogProcessor")
import actorSystem.dispatcher
implicit val flowMaterializer = ActorMaterializer()
private def getBlogUri(id: Integer): String = "/posts/" + id
private def getCommentsUri(blog: Blog): String = "/comments?postId=" + blog.id
private def parseBlogResponse(jsonResponse: String): Blog = Json.parse(jsonResponse).as[Blog]
private def parseCommentsResponse(jsonResponse: String): List[Comment] = Json.parse(jsonResponse).as[List[Comment]]
val pooledConnectionFlow = {
val connectionSettings = ConnectionPoolSettings(actorSystem)
.withMaxConnections(32)
.withMaxOpenRequests(32)
.withMaxRetries(3)
Http().cachedHostConnectionPool[Int](host = "jsonplaceholder.typicode.com", settings = connectionSettings)
}
val source = Source(1 to 32)
val fetchBlogsFlow = Flow[Int]
.map((id: Int) => (getBlogUri(id),id))
.map{ case(uri:String, id:Int) => (HttpRequest(method = HttpMethods.GET, uri = uri), id) }
.via(pooledConnectionFlow)
.map { case(response: Try[HttpResponse], id:Int) => handleBlogResponse(response, id) }
.map((jsonText: Try[String]) => jsonText.map(j => parseBlogResponse(j)))
val sink = Sink.foreach[Try[Blog]](blog => blog.map(b=> println(b)))
source.via(fetchBlogsFlow).runWith(sink)
private def handleBlogResponse(response: Try[HttpResponse], id: Int): Try[String] = {
println(s"Received response for id $id on thread ${Thread.currentThread().getName}")
response.flatMap((r: HttpResponse) => {
r.status match {
case StatusCodes.OK => {
Success(Await.result(Unmarshal(r.entity).to[String], Duration.Inf))
}
case _ => Failure(new RuntimeException("Invalid response : " + r.status.toString()))
}
})
}
}
现在,我要的是创造做步骤3和4的另一个流,我会在第一流动后链。但是,我正在为来自第一个流程的讨厌的Try[Blog]
输出而挣扎。如何将Try[Blog]
管道化为另一个HTTP请求?有没有办法分裂管道,失败是一种方式,而成功则是另一种方式?
以下是我对第二流,但我不知道如何使链接工作,而在调用get
的Try
:
val processBlogsFlow = Flow[Try[Blog]]
.map((tryBlog: Try[Blog]) => tryBlog.get)
.map((blog: Blog) => (HttpRequest(method=HttpMethods.GET, uri=getCommentsUri(blog)), blog.id))
.via(pooledConnectionFlow)
如果你想放弃失败,你可以使用'Flow.collect'。如果你想处理失败,你可以对流进行分区,例如使用'Flow.groupBy'或'分区'。当请求导致“失败”时会发生什么? – devkat
它将被记录下来,并可能在稍后的批次中重新排队等待重试。 –