这是不是很清楚你想从响应中得到什么。 stream()
返回Future
,因此如果你在flatMap
上,你还需要从处理程序返回未来。你想在那里返回什么?例如,如果你想获得Future[String]
与响应的整个身体,你可以使用runReduce(_ + _)
:
val result: Future[String] = ws.url(url).stream()
.flatMap(response => response.body
.via(framing)
.map(_.utf8String)
.map(_ + "\n")
.runReduce(_ + _)
)
runReduce(f: (U, U) => U)
返回Future[U]
,那就是,在你的情况下,将Future[String]
。如果你想通过一些其他的功能分别处理输入流的每个元素,你可以使用runForeach
:
ws.url(url).stream()
.flatMap(response => response.body
.via(framing)
.map(_.utf8String)
.map(_ + "\n")
.runForeach(s => handleString(s))
)
没有你想要做的更多的细节,这是很难提供一个更具体的答案。
更新:如果要限制从外部服务器来的邮件,你可以使用内置的throttle
组合子:
val result: Future[Source[String, _]] = ws.url(url)
.stream()
.map { response =>
response.body
.via(framing)
.map(_.utf8String)
.map(_ + "\n")
.throttle(10, 1.second, 10, ThrottleMode.Shaping)
}
这里,result
未来将包含流String
s,实现时,每秒钟最多可产生10个元素,必要时进行反压。您可以在我上面链接的文档中找到更多信息。
谢谢 - 输出是JSON字符串。我想要得到这个字符串,并用scala来表示它并更新我的案例类。 –
@SakarSR,我还是不明白,对不起。 “短语”是什么意思?考虑编写一个你想在没有反应流的情况下做什么的例子。 –
用户名:@Vladimir url web服务器连续不断地发送json数据。我想捕获数据并对数据进行一些计算并将其传递给前端。我也想减慢我的网络服务器(反压)。计划使用Akka流。我希望它清楚。 –