2017-03-04 130 views
0

如何在下面的代码返回concurrent.Future:如何处理流是来自WSClient在阿卡流数据2

val nextMeetup = ws.url(url).stream() 
     .flatMap(response => response.body 
      .via(framing) 
      .map(_.utf8String) 
      .map(_ + "\n") 
     ) 

类型不匹配错误: 发现:akka.stream.scaladsl.Source [字符串,_ $ 2] 要求:scala.concurrent.Future []

我的网址分裂JSON数据流 - 它不是一个Twitter流

请解释来解决这个问题?

回答

0

这是不是很清楚你想从响应中得到什么。 stream()返回Future,因此如果你在flatMap上,你还需要从处理程序返回未来。你想在那里返回什么?例如,如果你想获得Future[String]与响应的整个身体,你可以使用runReduce(_ + _)

val result: Future[String] = ws.url(url).stream() 
    .flatMap(response => response.body 
     .via(framing) 
     .map(_.utf8String) 
     .map(_ + "\n") 
     .runReduce(_ + _) 
    ) 

runReduce(f: (U, U) => U)返回Future[U],那就是,在你的情况下,将Future[String]。如果你想通过一些其他的功能分别处理输入流的每个元素,你可以使用runForeach

ws.url(url).stream() 
    .flatMap(response => response.body 
     .via(framing) 
     .map(_.utf8String) 
     .map(_ + "\n") 
     .runForeach(s => handleString(s)) 
    ) 

没有你想要做的更多的细节,这是很难提供一个更具体的答案。


更新:如果要限制从外部服务器来的邮件,你可以使用内置的throttle组合子:

val result: Future[Source[String, _]] = ws.url(url) 
    .stream() 
    .map { response => 
    response.body 
     .via(framing) 
     .map(_.utf8String) 
     .map(_ + "\n") 
     .throttle(10, 1.second, 10, ThrottleMode.Shaping) 
    } 

这里,result未来将包含流String s,实现时,每秒钟最多可产生10个元素,必要时进行反压。您可以在我上面链接的文档中找到更多信息。

+0

谢谢 - 输出是JSON字符串。我想要得到这个字符串,并用scala来表示它并更新我的案例类。 –

+0

@SakarSR,我还是不明白,对不起。 “短语”是什么意思?考虑编写一个你想在没有反应流的情况下做什么的例子。 –

+0

用户名:@Vladimir url web服务器连续不断地发送json数据。我想捕获数据并对数据进行一些计算并将其传递给前端。我也想减慢我的网络服务器(反压)。计划使用Akka流。我希望它清楚。 –