2016-02-29 60 views
0

我在cassandra中使用spark/cassandra驱动程序遍历了数十亿行并提取数据以运行统计信息。为了做到这一点,我在每一行数据上运行一个FOR循环,如果它落在我称之为“通道”的一桶数据的标准范围内,那么我将它以K,V对的形式添加到ArrayList中信道,功率。Java Spark flatMap似乎在丢失ArrayList中的项目

[[通道,电源]]

通道应当基于for循环迭代增量是静态的。例如,如果我的频道范围为0到10,增量为2,那么频道将为0,2,4,6,8,10

FOR循环在当前数据行上运行并检查是否数据落入通道中,如果是这样,则将其添加到格式为 [[Channel,Power]]的格式列表数据中

然后进入下一行并执行相同操作。一旦遍历所有行,它就会增加到下一个通道并重复该过程。

问题是有数十亿行符合相同频道的行,所以我不确定我是否应该使用ArrayListflatMap或其他的东西,因为我的结果每次运行时都会略有不同,渠道不是应该是静态的。

数据的小样本[频道,电力]将是:

[[2,5]] 
[[2,10]] 
[[2,5]] 
[[2,15]] 
[[2,5]] 

注意到有我的是需要继续,因为我在每次运行最小值,最大值,平均值统计项目是重复这些渠道。

通道2:闵5,最多15,平均8

我的代码如下:

JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SparkTestB", "Measured_Value", mapRowTo) 
      .select("Start_Frequency","Bandwidth","Power"); 
    JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){ 
     @Override 
     public Iterable<Value> call(MeasuredValue row) throws Exception { 
     long start_frequency = row.getStart_frequency(); 
     float power = row.getPower(); 
     long bandwidth = row.getBandwidth(); 

     // Define Variable 
     long channel,channel_end, increment; 

     // Initialize Variables 
     channel_end = 10; 
     increment = 2; 

     List<Value> list = new ArrayList<>(); 
     // Create Channel Power Buckets 
     for(channel = 0; channel <= channel_end;){ 
      if((channel >= start_frequency) && (channel <= (start_frequency + bandwidth))) { 
      list.add(new Value(channel, power)); 
      } // end if 
      channel+=increment; 
     } // end for 
     return list; 
     } 
    }); 

    sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel")) 
    .agg(min("power"), max("power"), avg("power")) 
    .write().mode(SaveMode.Append)  
    .option("table", "results") 
    .option("keyspace", "model") 
    .format("org.apache.spark.sql.cassandra").save(); 

我的类是一个遵循用于反射:

public class Value implements Serializable { 
    public Value(Long channel, Float power) { 
     this.channel = channel; 
     this.power = power; 
    } 
    Long channel; 
    Float power; 

    public void setChannel(Long channel) { 
     this.channel = channel; 
    } 
    public void setPower(Float power) { 
     this.power = power; 
    } 
    public Long getChannel() { 
     return channel; 
    } 
    public Float getPower() { 
     return power; 
    } 

    @Override 
    public String toString() { 
     return "[" +channel +","+power+"]"; 
    } 
} 

public static class MeasuredValue implements Serializable { 
     public MeasuredValue() { } 

     public long start_frequency; 
     public long getStart_frequency() { return start_frequency; } 
     public void setStart_frequency(long start_frequency) { this.start_frequency = start_frequency; } 

     public long bandwidth ; 
     public long getBandwidth() { return bandwidth; } 
     public void setBandwidth(long bandwidth) { this.bandwidth = bandwidth; } 

     public float power;  
     public float getPower() { return power; } 
     public void setPower(float power) { this.power = power; } 

    } 
+0

嗯...我看到你使用ArrayList没有问题。 flatMap函数将把列表中的每个元素转换为RDD中的一行。你能不能尝试只计算一次valueRdd中的行数(多次执行valueRdd.count)来验证你正在处理的数据在运行中是否真的没有变化? –

+0

我担心的原因是因为我获得了更准确的信息,我提取的是较小的一组数据。例如,如果我查询1000行,我将获得100个通道的最小值,最大值,平均值的数据,但如果我做了比较大的100,000行查询,我会得到一些奇怪的东西,比如42个通道。如果我查询更多行,我应该有更多而不是更少。似乎事情正在被覆盖或以某种方式清除。 – mithrix

+0

我试着做一个valueRdd.count();但它告诉我计数方法没有找到。 – mithrix

回答

0

我发现这些差异对我的信道化算法有影响。我用以下代替来解决问题。

 // Create Channel Power Buckets 
     for(; channel <= channel_end; channel+=increment){ 
      //Initial Bucket 
      while((start_frequency >= channel) && (start_frequency < (channel + increment))){ 
       list.add(new Value(channel, power)); 
       channel+=increment; 
      } 
      //Buckets to Accomodate for Bandwidth 
      while ((channel <= channel_end) && (channel >= start_frequency) && (start_frequency + bandwidth) >= channel){ 
       list.add(new Value(channel, power));       
       channel+=increment; 
      }     
     }