2016-09-19 269 views
0

我想用Scala对日志数据执行一系列转换,而且我在匹配元组时遇到了困难。我有一个数据框与用户ID,网址和日期。我可以将数据帧映射到RDD以及主要与此映射减少:Scala Spark映射类型匹配问题

val countsRDD = usersUrlsDays.map { case Row(date:java.sql.Date, user_id:Long, url:String) => Tuple2(Tuple2(user_id, url), 1) }.rdd.reduceByKey(_+_) 

这让我的RDD((USER_ID,URL),计数):

scala> countsRDD.take(1) 
res9: Array[((Long, String), Int)]  
scala> countsRDD.take(1)(0) 
res10: ((Long, String), Int) 

现在我想反转通过URL来获得:

(url, [(user_id, count), ...]) 

我已经试过这样:

val urlIndex = countsRDD.map{ case Row(((user_id:Long, url:String), count:Int)) => Tuple2(url, List(Tuple2(user_id, count))) }.reduceByKey(_++_) 

这将产生匹配误差,但是:

scala.MatchError: ... (of class scala.Tuple2) 

我已经试过这两个地图明确和隐含的类型,这似乎呼吁已经得到了我最远的很多很多不同的排列。我希望有人能帮助我指出正确的方向。

回答

2

像这样的东西应该工作:

countsRDD 
    .map{ case ((user_id, url), count) => (url, (user_id, count)) } 
    .groupByKey 
  • countsRDDRDD[((String, String), Int)]RDD[Row]
  • 没有必要使用TupleN。元组文字可以正常工作。
  • 由于countsRDD是静态类型的(不像RDD[Row]),您不必指定类型。
  • 请勿使用reduceByKey进行列表连接。这是你可以采取的最糟糕的方法,并忽略计算复杂性,垃圾收集器和常识。 如果您真的需要分组数据使用操作是专为它设计的。