2015-10-05 81 views
1

我正在使用Scala编写Spark流应用程序,其中我的目标是通过每秒读取Twitter源来计算60秒窗口中最多转发的状态。使用reduceByKeyAndWindow()运行在Scala上的滑动窗口使用reduceByKeyAndWindow()

我在概念上想要的是在滑动窗口结束时获取状态转发的数量,并在开始时从等效数字中减去它的数量,以便找到否。在窗口内转推。代码的相关行是:

val counts = tweets.filter(_.isRetweet).map { status => 
       (status.getText(), status.getRetweetedStatus().getRetweetCount()) 
      }.reduceByKeyAndWindow(*function*, Seconds(60), Seconds(1)) 

所以,我的问题是我应该使用什么功能,在这里达到预期的效果,那就是得到最大的价值,该窗口内getRetweetCount()回报,减去最小值它。

回答

0

纠正我,如果我错了或在这里作出错误的假设,但你基本上检查Seconds(60)窗口内的状态转推的数量。要做到这一点,您已经拥有可以移除所有未转推推文的过滤器(filter(_.isRetweet))。现在,您需要做的就是汇总转发的状态以确定其频率。

这可以通过以下操作来实现:

val counts = tweets.filter(_.isRetweet).map { status => 
       (status.getText(), null) 
      }.countByValueAndWindow(Seconds(60), Seconds(1)) 

也许在此之后,你可以通过价值秩序,窗口内云集最多转推的tweets。

+0

这也是我的第一个想法,但后来我意识到,我正在阅读的公共Twitter流只是给我一小部分全球流量,所以我错过了很多推文。这就是为什么我使用getRetweetCount()来查看是否有错过的转发。所以我想要的是,在窗口内获得转推计数的最大值,并从中减去最小值,以确保我捕获了所有内容。 – nikos