2016-11-13 43 views
0

使用spark,我解析了csv文件,其中每行代表应用程序用户所做的调用。解析后,我得到了JavaRDD对象,它通常包含单个用户的多个条目。解析CSV并聚合相同的记录

现在我想要实现的是总结每个用户的总讲话时间。我遵循了其他地方给出的单词计数示例,并且它也在我的案例中工作,但是,我不确定是否这样是正确的方法,因为我必须将每个解析对象映射到一个单独的键。

我写的代码粘贴在下面,但是,我不确定这是否正确。

JavaRDD <Subscriber> cdrs = textFile.flatMap(new FlatMapFunction < String, Subscriber >() { 
public Iterable <Subscriber> call(String line) { 
    List <Subscriber> list = new ArrayList <Subscriber>(); 
    String[] fields = line.split(","); 

    if (fields != null && fields[0].trim().matches("[0-9]+")) { 
    Subscriber subscriber = new Subscriber(); 
    subscriber.setMsisdn(fields[0].trim()); 
    subscriber.setDuration(Double.parseDouble(fields[5].replaceAll("s", ""))); 

    list.add(subscriber); 
    } 

    return list; 
} 
}); 

JavaPairRDD < String, Subscriber > counts = words.mapToPair(new PairFunction < Subscriber, String, Subscriber >() { 
public Tuple2 < String, Subscriber > call(Subscriber s) { 
    return new Tuple2 < String, Subscriber > (s.getMsisdn(), s); 
} 
}).reduceByKey(new Function2 < Subscriber, Subscriber, Subscriber >() { 
@Override 
public Subscriber call(Subscriber v1, Subscriber v2) throws Exception { 
    v1.setDuration(v1.getDuration() + v2.getDuration()); 
    return v1; 
} 
}); 
+0

您是否仅限于RDD?使用Dataframe是您的选择吗? – Yaron

+0

我可以使用它,它只是我没有太多火花的想法,刚开始使用它 – Waqas

回答

0

我(用火花2.0蟒蛇火花)使用的火花数据框写入以下伪代码:

df = spark.read.format("csv").option("header", "true").load("csv_file.csv") 
new_df = df.groupBy("username").agg(sum("talk_time").alias("total_talk_time"); 

第一行 - 负载CSV到数据帧(见这里https://stackoverflow.com/a/37640154/5088142我的回答更多信息)

第二行 - 列上“talk_time”的集合数据由用户名列组,并且执行sum()函数

上GROUPBY方式/汇总可以在这里找到:http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframegroupby-retains-grouping-columns

新的数据框应该有一个“用户名”列和“total_talk_time”列 - 这将保存您正在寻找的数据。

您必须稍微修改才能将其作为Java-spark执行...