2017-08-15 279 views
0

我有一个RDD的分区包含元素(熊猫数据框,因为它发生),可以很容易地变成行列表。把它看成是看起来像这样RDD的pyspark行列表DataFrame

rows_list = [] 
for word in 'quick brown fox'.split(): 
    rows = [] 
    for i,c in enumerate(word): 
     x = ord(c) + i 
     row = pyspark.sql.Row(letter=c, number=i, importance=x) 
     rows.append(row) 
    rows_list.append(rows) 
rdd = sc.parallelize(rows_list) 
rdd.take(2) 

这给

[[Row(importance=113, letter='q', number=0), 
    Row(importance=118, letter='u', number=1), 
    Row(importance=107, letter='i', number=2), 
    Row(importance=102, letter='c', number=3), 
    Row(importance=111, letter='k', number=4)], 
[Row(importance=98, letter='b', number=0), 
    Row(importance=115, letter='r', number=1), 
    Row(importance=113, letter='o', number=2), 
    Row(importance=122, letter='w', number=3), 
    Row(importance=114, letter='n', number=4)]] 

我希望把它变成一个Spark数据帧。我希望我可以做

rdd.toDF() 

,但给人的无用结构

DataFrame[_1: struct<importance:bigint,letter:string,number:bigint>, 
      _2: struct<importance:bigint,letter:string,number:bigint>, 
      _3: struct<importance:bigint,letter:string,number:bigint>, 
      _4: struct<importance:bigint,letter:string,number:bigint>, 
      _5: struct<importance:bigint,letter:string,number:bigint>] 

我真正想要的是一个3列数据框,如该

desired_df = sql_context.createDataFrame(sum(rows_list, [])) 

让我可执行如下操作:

desired_df.agg(pyspark.sql.functions.sum('number')).take(1) 

并得到答案23.

什么是正确的方式去做这件事?

回答

1

当您需要行的RDD时,您拥有行列表的RDD;您可以将rddflatMap放在一起,然后将其转换为数据帧:

rdd.flatMap(lambda x: x).toDF().show() 

+----------+------+------+ 
|importance|letter|number| 
+----------+------+------+ 
|  113|  q|  0| 
|  118|  u|  1| 
|  107|  i|  2| 
|  102|  c|  3| 
|  111|  k|  4| 
|  98|  b|  0| 
|  115|  r|  1| 
|  113|  o|  2| 
|  122|  w|  3| 
|  114|  n|  4| 
|  102|  f|  0| 
|  112|  o|  1| 
|  122|  x|  2| 
+----------+------+------+ 

import pyspark.sql.functions as F 

rdd.flatMap(lambda x: x).toDF().agg(F.sum('number')).show() 
+-----------+ 
|sum(number)| 
+-----------+ 
|   23| 
+-----------+