我在cassandra中使用spark/cassandra驱动程序遍历了数十亿行并提取数据以运行统计信息。为了做到这一点,我在每一行数据上运行一个FOR
循环,如果它落在我称之为“通道”的一桶数据的标准范围内,那么我将它以K,V对的形式添加到ArrayList中信道,功率。Java Spark flatMap似乎在丢失ArrayList中的项目
[[通道,电源]]
通道应当基于for循环迭代增量是静态的。例如,如果我的频道范围为0到10,增量为2,那么频道将为0,2,4,6,8,10
FOR
循环在当前数据行上运行并检查是否数据落入通道中,如果是这样,则将其添加到格式为 [[Channel,Power]]的格式列表数据中
然后进入下一行并执行相同操作。一旦遍历所有行,它就会增加到下一个通道并重复该过程。
问题是有数十亿行符合相同频道的行,所以我不确定我是否应该使用ArrayList
和flatMap
或其他的东西,因为我的结果每次运行时都会略有不同,渠道不是应该是静态的。
数据的小样本[频道,电力]将是:
[[2,5]]
[[2,10]]
[[2,5]]
[[2,15]]
[[2,5]]
注意到有我的是需要继续,因为我在每次运行最小值,最大值,平均值统计项目是重复这些渠道。
通道2:闵5,最多15,平均8
我的代码如下:
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SparkTestB", "Measured_Value", mapRowTo)
.select("Start_Frequency","Bandwidth","Power");
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
long start_frequency = row.getStart_frequency();
float power = row.getPower();
long bandwidth = row.getBandwidth();
// Define Variable
long channel,channel_end, increment;
// Initialize Variables
channel_end = 10;
increment = 2;
List<Value> list = new ArrayList<>();
// Create Channel Power Buckets
for(channel = 0; channel <= channel_end;){
if((channel >= start_frequency) && (channel <= (start_frequency + bandwidth))) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
我的类是一个遵循用于反射:
public class Value implements Serializable {
public Value(Long channel, Float power) {
this.channel = channel;
this.power = power;
}
Long channel;
Float power;
public void setChannel(Long channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Long getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
public long start_frequency;
public long getStart_frequency() { return start_frequency; }
public void setStart_frequency(long start_frequency) { this.start_frequency = start_frequency; }
public long bandwidth ;
public long getBandwidth() { return bandwidth; }
public void setBandwidth(long bandwidth) { this.bandwidth = bandwidth; }
public float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}
嗯...我看到你使用ArrayList没有问题。 flatMap函数将把列表中的每个元素转换为RDD中的一行。你能不能尝试只计算一次valueRdd中的行数(多次执行valueRdd.count)来验证你正在处理的数据在运行中是否真的没有变化? –
我担心的原因是因为我获得了更准确的信息,我提取的是较小的一组数据。例如,如果我查询1000行,我将获得100个通道的最小值,最大值,平均值的数据,但如果我做了比较大的100,000行查询,我会得到一些奇怪的东西,比如42个通道。如果我查询更多行,我应该有更多而不是更少。似乎事情正在被覆盖或以某种方式清除。 – mithrix
我试着做一个valueRdd.count();但它告诉我计数方法没有找到。 – mithrix