2016-11-09 121 views
-1

我有一个像这样的RDD:RDD[(Any, Array[(Any, Any)])] 我只是想将它转换成一个DataFrame。因此,我用这个模式将Rdd转换为数据帧

val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false))) 

val df = Seq(
    ("A",1,"12/06/2012"), 
    ("A",2,"13/06/2012"), 
    ("B",3,"12/06/2012"), 
    ("B",4,"17/06/2012"), 
    ("C",5,"14/06/2012")).toDF("C1", "C2","C3") 
df.show(false) 

val rdd = df.map(line => (line(0), (line(1), line(2)))) 
    .groupByKey() 
    .mapValues(i => i.toList).foreach(println) 

val output_df = sqlContext.createDataFrame(rdd, schema) 

我RDD这个样子的:

(B,List((3,12/06/2012), (4,17/06/2012)))  
(A,List((1,12/06/2012), (2,13/06/2012)))  
(C,List((5,14/06/2012))) 

或类似这样的

(A,[Lscala.Tuple2;@3e8f27c9) 
(C,[Lscala.Tuple2;@6f22defb) 
(B,[Lscala.Tuple2;@1b8692ec) 

如果我使用:

.mapValues(i => i.toArray) 

我已经尝试此:

val output_df = sqlContext.createDataFrame(rdd, schema) 

,但我得到:

Error:(40, 32) overloaded method value createDataFrame with alternatives: 
    (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame 
cannot be applied to (Unit, org.apache.spark.sql.types.StructType) 
    val output_df = sqlContext.createDataFrame(rdd, schema) 

拉斐尔·罗斯
尝试第二种方法至极不起作用,我得到:

Error:(41, 24) No TypeTag available for MySchema 
    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF() 

第一种方法的工作很好,但我失去了我的元组的第一个元素与.mapValues(i => i.map(_._2))

你知道我是否可以完成fi第一个方法,以保持两个元素

我决定把它转换我的元组字符串,但这不是根据我,因为我将要分裂我的字符串元组读取列优雅的解决方案:

val rdd = df.map(line => (line(0), (line(1), line(2)))).groupByKey() 
     .mapValues(i => i.map(w => (w._1,w._2).toString)) 
     .map(i=>Row(i._1,i._2)) 

谢谢你的帮助

+0

可能重复的[如何将rdd对象转换为火花中的数据帧](http://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark) – cheseaux

+3

我认为这将有助于如果您将错误添加到问题 – maasg

+0

@a。 moussa解决'没有TypeTag可用于MySchema',你必须定义主要方法以外的案例类(如果有的话) –

回答

0

GroupByKey给你一个元组的Seq,你没有考虑到你的模式。此外,sqlContext.createDataFrame需要您没有提供的RDD[Row]

这应该使用schema

val rdd = df.map(line => (line(0), (line(1), line(2)))) 
    .groupByKey() 
    .mapValues(i => i.map(_._2)) 
    .map(i=>Row(i._1,i._2)) 

val output_df = sqlContext.createDataFrame(rdd, schema) 

你也可以使用一个case class可用于映射元组(不知道元组架构可以编程方式创建的):

val df = Seq(
     ("A", 1, "12/06/2012"), 
     ("A", 2, "13/06/2012"), 
     ("B", 3, "12/06/2012"), 
     ("B", 4, "17/06/2012"), 
     ("C", 5, "14/06/2012")).toDF("C1", "C2", "C3") 
    df.show(false) 

    val rdd = df.map(line => (line(0), (line(1), line(2)))) 
     .groupByKey() 
     .mapValues(i => i.toList) 

    // this should be placed outside of main() 
    case class MySchema(C1: String, C4: List[(Int, String)]) 

    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF() 
+0

嗨,谢谢你的回答,它不起作用,我用你的评论完成我的问题。如果你有任何想法,这是真正有用的。 –

+0

非常感谢你,当我将MySchema移动到我的方法之外时,它工作得非常好 –