我有一个像这样的RDD:RDD[(Any, Array[(Any, Any)])]
我只是想将它转换成一个DataFrame。因此,我用这个模式将Rdd转换为数据帧
val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false)))
val df = Seq(
("A",1,"12/06/2012"),
("A",2,"13/06/2012"),
("B",3,"12/06/2012"),
("B",4,"17/06/2012"),
("C",5,"14/06/2012")).toDF("C1", "C2","C3")
df.show(false)
val rdd = df.map(line => (line(0), (line(1), line(2))))
.groupByKey()
.mapValues(i => i.toList).foreach(println)
val output_df = sqlContext.createDataFrame(rdd, schema)
我RDD这个样子的:
(B,List((3,12/06/2012), (4,17/06/2012)))
(A,List((1,12/06/2012), (2,13/06/2012)))
(C,List((5,14/06/2012)))
或类似这样的
(A,[Lscala.Tuple2;@3e8f27c9)
(C,[Lscala.Tuple2;@6f22defb)
(B,[Lscala.Tuple2;@1b8692ec)
如果我使用:
.mapValues(i => i.toArray)
我已经尝试此:
val output_df = sqlContext.createDataFrame(rdd, schema)
,但我得到:
Error:(40, 32) overloaded method value createDataFrame with alternatives:
(data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
val output_df = sqlContext.createDataFrame(rdd, schema)
拉斐尔·罗斯
尝试第二种方法至极不起作用,我得到:
Error:(41, 24) No TypeTag available for MySchema
val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()
第一种方法的工作很好,但我失去了我的元组的第一个元素与.mapValues(i => i.map(_._2))
你知道我是否可以完成fi第一个方法,以保持两个元素
我决定把它转换我的元组字符串,但这不是根据我,因为我将要分裂我的字符串元组读取列优雅的解决方案:
val rdd = df.map(line => (line(0), (line(1), line(2)))).groupByKey()
.mapValues(i => i.map(w => (w._1,w._2).toString))
.map(i=>Row(i._1,i._2))
谢谢你的帮助
可能重复的[如何将rdd对象转换为火花中的数据帧](http://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark) – cheseaux
我认为这将有助于如果您将错误添加到问题 – maasg
@a。 moussa解决'没有TypeTag可用于MySchema',你必须定义主要方法以外的案例类(如果有的话) –