2017-03-07 155 views
2

创建从外部文件数据帧DF,其具有以下模式的转置数据帧:如何创建从另一数据帧

(ID,FIELD1,FIELD2,字段3)分配柱:ID

数据的例子是

000, 11_field1, 22_field2, 33_field3 
001, 111_field1, 222_field2, 333_field3 

我想创建DF另一个数据帧,其模式是

(id, fieleName, fieldValue) 

数据的例子是

000, field1, 11_field1 
000, field2, 22_field2 
000, field3, 33_field3 
001, field1, 111_field1 
001, field2, 222_field2 
001, field3, 333_field3 

谁能告诉我如何获得新的数据帧?

+0

你试过我给出的答案吗?答案是否符合您的要求? – User12345

+0

请注意回复评论或回答 – User12345

+0

谢谢您的回答。这个对我有用。 –

回答

2

你可以像下面使用explode选项

首先导入所需的库和功能

from pyspark.sql import SQLContext, Row 

说你的数据帧dfpyspark实现这一目标。

如果你df.show()

你应该得到类似结果如下

+---+----------+----------+----------+ 
| id| field1| field2| field3| 
+---+----------+----------+----------+ 
| 0| 11_field1| 22_field2| 33_field3| 
| 1|111_field1|222_field2|333_field3| 
+---+----------+----------+----------+ 

然后映射要爆炸的2列的所有列。在这里,您希望除ID之外的所有列都会爆炸。所以,下面做

cols= df.columns[1:] 

当时的data frame转换为rdd像下面

rdd = data.rdd.map(lambda x: Row(id=x[0], val=dict(zip(cols, x[1:])))) 

要检查如何RDD已映射不低于

rdd.take() 

你会得到结果如下图所示

[Row(id=0, val={'field2': u'22_field2', 'field3': u'33_field3', 'field1': u'11_field1'}), Row(id=1, val={'field2': u'222_field2', 'field3': u'333_field3', 'field1': u'111_field1'})] 

然后转换rdddata framedf2

df2 = sqlContext.createDataFrame(rdd) 

然后做df2.show()。你应该得到结果如下图所示

+---+--------------------+ 
| id|     val| 
+---+--------------------+ 
| 0|Map(field3 -> 33_...| 
| 1|Map(field3 -> 333...| 
+---+--------------------+ 

然后注册数据帧DF2作为一个临时表

df2.registerTempTable('mytempTable') 

然后运行数据帧像下面的查询:

df3 = sqlContext.sql("""select id,explode(val) AS (fieldname,fieldvalue) from mytempTable""") 

然后做你应该得到如下结果

+---+---------+----------+ 
| id|fieldname|fieldvalue| 
+---+---------+----------+ 
| 0| field3| 33_field3| 
| 0| field2| 22_field2| 
| 0| field1| 11_field1| 
| 1| field3|333_field3| 
| 1| field2|222_field2| 
| 1| field1|111_field1| 
+---+---------+----------+