2017-02-27 51 views
0

合并多行一些数据处理后,我结束了该数据集:变换星火Datset - 计数和ID

Dataset<Row> counts //ID,COUNT,DAY_OF_WEEK 

现在我想这个转换为该格式,并保存为CSV:

ID,COUNT_DoW1, ID,COUNT_DoW2, ID,COUNT_DoW3,..ID,COUNT_DoW7 

我能想到的一个办法:

JavaPairRDD<Long, Map<Integer, Integer>> r = counts.toJavaRDD().mapToPair(...) 
JavaPairRDD<Long, Map<Integer, Integer>> merged = r.reduceByKey(...); 

当它一对“ID”的名单和大小的7 获得JavaPairRDD后,我可以将其存储在csv中。没有将其转换为RDD,是否有更简单的方法进行转换?

回答

0

您可以使用struct函数从cnt和day构造一对,然后使用collect_list做一个groupby。 像这样的东西(斯卡拉,但你可以很容易地转换为Java):

df.groupBy("ID").agg(collect_list(struct("COUNT","DAY"))) 

现在你可以写一个UDF,用于提取相关列。所以你只需在循环中做一个withColumn来简单地复制ID(df.withColumn(“id2”,col(“id”)))

然后你创建一个UDF从位置i提取count元素并运行它在所有专栏上,最后一天也是如此。

如果你保留了你想要的订单并丢弃不相关的列,你会得到你所要求的。

您也可以使用转动命令工作(又在斯卡拉但你应该能够很容易地转换成Java):

df.show() 
>>+---+---+---+ 
>>| id|cnt|day| 
>>+---+---+---+ 
>>|333| 31| 1| 
>>|333| 32| 2| 
>>|333|133| 3| 
>>|333| 34| 4| 
>>|333| 35| 5| 
>>|333| 36| 6| 
>>|333| 37| 7| 
>>|222| 41| 4| 
>>|111| 11| 1| 
>>|111| 22| 2| 
>>|111| 33| 3| 
>>|111| 44| 4| 
>>|111| 55| 5| 
>>|111| 66| 6| 
>>|111| 77| 7| 
>>|222| 21| 1| 
>>+---+---+---+ 

val df2 = df.withColumn("all",struct('id, 'cnt' 'day)) 

val res = .groupBy("id").pivot("day").agg(first('all).as("bla")).select("1.*","2.*","3.*", "4.*", "5.*", "6.*", "7.*") 

res.show() 
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+ 
>>| id|cnt|day| id| cnt| day| id| cnt| day| id|cnt|day| id| cnt| day| id| cnt| day| id| cnt| day| 
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+ 
>>|333| 31| 1| 333| 32| 2| 333| 133| 3|333| 34| 4| 333| 35| 5| 333| 36| 6| 333| 37| 7| 
>>|222| 21| 1|null|null|null|null|null|null|222| 41| 4|null|null|null|null|null|null|null|null|null| 
>>|111| 11| 1| 111| 22| 2| 111| 33| 3|111| 44| 4| 111| 55| 5| 111| 66| 6| 111| 77| 7| 
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+