2017-08-03 52 views
2

我正在使用Spark 2.1.1和dataframe。这里是我的输入数据框:转换数据框:几列按顺序排列

+----+---------+---------+-------+ 
| key|parameter|reference| subkey| 
+----+---------+---------+-------+ 
|key1|  45|  10|subkey1| 
|key1|  45|  20|subkey2| 
|key2|  70|  40|subkey2| 
|key2|  70|  30|subkey1| 
+----+---------+---------+-------+ 

我需要的数据帧转换到下一:

result data (by pandas): 
+-----+-----------+ 
|label| features| 
+-----+-----------+ 
| 45|[10.0,20.0]| 
| 70|[30.0,40.0]| 
+-----+-----------+ 

我能做的改造与大熊猫的帮助:

def convert_to_flat_by_pandas(df): 
    pandas_data_frame = df.toPandas() 
    all_keys = pandas_data_frame['key'].unique() 

    flat_values = [] 
    for key in all_keys: 
     key_rows = pandas_data_frame.loc[pandas_data_frame['key'] == key] 
     key_rows = key_rows.sort_values(by=['subkey']) 

     parameter_values = key_rows['parameter'] 
     parameter_value = parameter_values.real[0]   

     key_reference_value = [reference_values for reference_values in key_rows['reference']] 

     flat_values.append((parameter_value, key_reference_value)) 

    loaded_data = [(label, Vectors.dense(features)) for (label, features) in flat_values] 
    spark_df = spark.createDataFrame(loaded_data, ["label", "features"]) 

    return spark_df 

看来,我需要使用GroupBy,但我不明白如何排序和转换组(几行)单行。

源工作样品(有熊猫的帮助):https://github.com/constructor-igor/TechSugar/blob/master/pythonSamples/pysparkSamples/df_flat.py

随着2个回答可以帮助我得到2个可能的解决方案:

UPD1解决方案#1

def convert_to_flat_by_sparkpy(df): 
    subkeys = df.select("subkey").dropDuplicates().collect() 
    subkeys = [s[0] for s in subkeys] 
    print('subkeys: ', subkeys) 
    assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features") 
    spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))  
    spark_df = spark_df.withColumnRenamed("parameter", "label") 
    spark_df = spark_df.select("label", "features") 
    return spark_df 

UPD1解决方案#2

def convert_to_flat_by_sparkpy_v2(df): 
    spark_df = df.orderBy("subkey") 
    spark_df = spark_df.groupBy("key").agg(first(col("parameter")).alias("label"), collect_list("reference").alias("features")) 
    spark_df = spark_df.select("label", "features") 
    return spark_df 
+0

我需要pyspark数据帧分组(而不是在熊猫) – constructor

+0

是什么features'的'类型,可以你显示'printSchema'的输出? –

+0

模式:'模式结果的数据帧: 根 | - label:string(nullable = true) | - features:vector(nullable = true)' – constructor

回答

1

对于已给出的有限的样本的数据,可以转换该数据帧到宽幅与子项作为标题,然后使用VectorAssembler收集它们作为特征:

from pyspark.sql.functions import first, col 
from pyspark.ml.feature import VectorAssembler 

assembler = VectorAssembler().setInputCols(["subkey1", "subkey2"]).setOutputCol("features") 

assembler.transform(
    df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))) 
).show() 
+----+---------+-------+-------+-----------+ 
| key|parameter|subkey1|subkey2| features| 
+----+---------+-------+-------+-----------+ 
|key1|  45|  10|  20|[10.0,20.0]| 
|key2|  70|  30|  40|[30.0,40.0]| 
+----+---------+-------+-------+-----------+ 

更新动态子项:

说,如果你有这样一个数据帧:

df.show() 
+----+---------+---------+-------+  
| key|parameter|reference| subkey| 
+----+---------+---------+-------+ 
|key1|  45|  10|subkey1| 
|key1|  45|  20|subkey2| 
|key2|  70|  40|subkey2| 
|key2|  70|  30|subkey1| 
|key2|  70|  70|subkey3| 
+----+---------+---------+-------+ 

收集所有唯一的子键首先,然后使用子项创建汇编:

subkeys = df.select("subkey").dropDuplicates().rdd.map(lambda r: r[0]).collect() 
assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features") 

assembler.transform( 
    df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))).na.fill(0) 
).show() 
+----+---------+-------+-------+-------+----------------+ 
| key|parameter|subkey1|subkey2|subkey3|  features| 
+----+---------+-------+-------+-------+----------------+ 
|key1|  45|  10|  20|  0| [20.0,10.0,0.0]| 
|key2|  70|  30|  40|  70|[40.0,30.0,70.0]| 
+----+---------+-------+-------+-------+----------------+ 
+0

这是一个很好的示例,但我有2个问题与我的真实代码:我有随机的子键值,我不能创建Vectors.dense df.features)' – constructor

+0

而不是'subkeys = df.select(“subkey”)。dropDuplicates()。rdd.map(lambda r:r [0])。collect()'我加了''subkeys = df.select “subkey”)。dropDuplicates()。collect() subkeys = [s [0] for s in subkeys]' – constructor

2

您可以使用GROUPBY和collect_list函数来获取输出

import org.apache.spark.sql.functions._ 

df.groupBy("parameter").agg(collect_list("reference").alias("features")) 

df1.withColumnRenamed("parameter", "label") 

输出:

+---------+--------+ 
|parameter|features| 
+---------+--------+ 
|  45|[10, 20]| 
|  70|[40, 30]| 
+---------+--------+ 

希望这有助于!

+0

我认为这会创建一个功能数组而不是矢量 –

+0

我需要通过子项对“功能”进行排序。预期结果:'[10,20]'和[30,40]' – constructor

+0

列“参数”上的值不是唯一的。恐怕我们不能使用'groupBy'中的列。 – constructor