2015-09-04 67 views
0

我想计算每个键的值的PairDStream的中值。Apache Spark Streaming:通过密钥的窗口化PairDStream的中值

我已经尝试以下,这是非常效率不高:

JavaPairDStream<String, Iterable<Float>> groupedByKey = pairDstream.groupByKey(); 

JavaPairDStream<String, Float> medianPerPlug1h = groupedByKey.transformToPair(new Function<JavaPairRDD<String,Iterable<Float>>, JavaPairRDD<String,Float>>() { 
     public JavaPairRDD<String, Float> call(JavaPairRDD<String, Iterable<Float>> v1) throws Exception { 
      return v1.mapValues(new Function<Iterable<Float>, Float>() { 
       public Float call(Iterable<Float> v1) throws Exception { 

        List<Float> buffer = new ArrayList<Float>(); 

        long count = 0L; 
        Iterator<Float> iterator = v1.iterator(); 
        while(iterator.hasNext()) { 
         buffer.add(iterator.next()); 
         count++; 
        } 

        float[] values = new float[(int)count]; 

        for(int i = 0; i < buffer.size(); i++) { 
         values[i] = buffer.get(i); 
        } 

        Arrays.sort(values); 

        float median; 

        int startIndex; 

        if(count % 2 == 0) { 
         startIndex = (int)(count/2 - 1); 

         float a = values[startIndex]; 
         float b = values[startIndex + 1]; 

         median = (a + b)/2.0f; 
        } else { 
         startIndex = (int)(count/2); 

         median = values[startIndex]; 
        } 

        return median; 
       } 
      }); 
     } 
}); 

medianPerPlug1h.print(); 

有人可以帮助我更有效的交易?我有大约1950个不同的密钥,每个密钥可以达到3600(每秒1个数据点,1小时窗口)值,在哪里可以找到中值。

谢谢!

+0

你需要多长时间一次计算中位数?你在使用滑动窗口吗? – vanekjar

+0

实际上我使用的是一个1h的窗口(所以数据是受限制的,并且在这个预热时间之后不会增长),并且每个幻灯片的持续时间和间隔为2秒。我可以增加批次和滑动间隔,但我想尽可能快地进行计算。 对我来说,找到一个更好的转换来获得更多Spark的并行算法会更有趣。 –

回答

0

首先,我不知道你为什么使用Spark来完成这种任务。考虑到你只有几千个值,它似乎与大数据无关。它可能使事情更加复杂。但是让我们假设你正在计划扩大到更大的数据集。

我会尝试使用一些更优化的算法来查找中值,而不仅仅是排序值。排序值的数组运行于O(n * log n)时间。

你可以考虑使用一些线性时间的中位数算法像Median of medians

+0

我刚刚学习Spark,想要尝试并行计算数据集有趣值的能力。它应该是稍后适应更大数据集的原型。但是,对于庞大的数据集,这种逻辑似乎太昂贵了,正如你上面提到的那样。 我会在接下来的几天看看Medians的中位数,谢谢你的信息! –

0

1)避免使用groupbykey; reducebykey比groupbykey更高效。 2)reduceByKeyAndWindow(Function2,windowduration,slideDuration)可以更好地为您服务。

例如: JavaPairDStream合并= yourRDD.reduceByKeyAndWindow(新功能2(){ 公共字符串呼叫(字符串为arg0,字符串ARG1)抛出异常{ 返回为arg0 + “” + ARG1; } },Durations.seconds (windowDur),Durations.seconds(slideDur));

假设此RDD的输出如下: (key,1,2,3,4,5,6,7) (key,1,2,3,4,5,6,7) 。 现在对于每个键,你可以解析这个,你将会得到如下数值:012 + 1 + 2 + 3 + 4 + 5 + 6 + 7/count。

我希望它有帮助:)