使用spark,我解析了csv文件,其中每行代表应用程序用户所做的调用。解析后,我得到了JavaRDD对象,它通常包含单个用户的多个条目。解析CSV并聚合相同的记录
现在我想要实现的是总结每个用户的总讲话时间。我遵循了其他地方给出的单词计数示例,并且它也在我的案例中工作,但是,我不确定是否这样是正确的方法,因为我必须将每个解析对象映射到一个单独的键。
我写的代码粘贴在下面,但是,我不确定这是否正确。
JavaRDD <Subscriber> cdrs = textFile.flatMap(new FlatMapFunction < String, Subscriber >() {
public Iterable <Subscriber> call(String line) {
List <Subscriber> list = new ArrayList <Subscriber>();
String[] fields = line.split(",");
if (fields != null && fields[0].trim().matches("[0-9]+")) {
Subscriber subscriber = new Subscriber();
subscriber.setMsisdn(fields[0].trim());
subscriber.setDuration(Double.parseDouble(fields[5].replaceAll("s", "")));
list.add(subscriber);
}
return list;
}
});
JavaPairRDD < String, Subscriber > counts = words.mapToPair(new PairFunction < Subscriber, String, Subscriber >() {
public Tuple2 < String, Subscriber > call(Subscriber s) {
return new Tuple2 < String, Subscriber > (s.getMsisdn(), s);
}
}).reduceByKey(new Function2 < Subscriber, Subscriber, Subscriber >() {
@Override
public Subscriber call(Subscriber v1, Subscriber v2) throws Exception {
v1.setDuration(v1.getDuration() + v2.getDuration());
return v1;
}
});
您是否仅限于RDD?使用Dataframe是您的选择吗? – Yaron
我可以使用它,它只是我没有太多火花的想法,刚开始使用它 – Waqas