2017-01-16 70 views
7

是否有PySpark或至少在斯卡拉大熊猫熔体功能的阿帕奇星火等效?如何融化Spark DataFrame?

我在Python中运行一个示例数据集到现在,现在我想使用Spark作为整个数据集。

在此先感谢。

+0

选中此项:http://chappers.github.io/web%20micro%20log/2016/03/07/implementing-simple-melt-function-for-pyspark/ – MYGz

+0

对不起,延迟响应... 即使对于使用 rdd = sc.parallelize([(“x”,1,4),(“y”,3,5),(“z”,2))创建的小样本数据集(rdd) ,6)]) –

回答

12

没有内置函数(如果您使用的SQL和Hive支持已启用,您可以使用stack function,但它不在Spark中公开,并且没有本机实现),但是自行推出并不重要。所需进口:

from pyspark.sql.functions import array, col, explode, lit, struct 
from pyspark.sql import DataFrame 
from typing import Iterable 

实现示例:

def melt(
     df: DataFrame, 
     id_vars: Iterable[str], value_vars: Iterable[str], 
     var_name: str="variable", value_name: str="value") -> DataFrame: 
    """Convert :class:`DataFrame` from wide to long format.""" 

    # Create array<struct<variable: str, value: ...>> 
    _vars_and_vals = array(*(
     struct(lit(c).alias(var_name), col(c).alias(value_name)) 
     for c in value_vars)) 

    # Add to the DataFrame and explode 
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

    cols = id_vars + [ 
      col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] 
    return _tmp.select(*cols) 

而且有些测试(基于Pandas doctests):

import pandas as pd 

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'}, 
        'B': {0: 1, 1: 3, 2: 5}, 
        'C': {0: 2, 1: 4, 2: 6}}) 

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C']) 
A variable value 
0 a  B  1 
1 b  B  3 
2 c  B  5 
3 a  C  2 
4 b  C  4 
5 c  C  6 
sdf = spark.createDataFrame(pdf) 
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show() 
+---+--------+-----+ 
| A|variable|value| 
+---+--------+-----+ 
| a|  B| 1| 
| a|  C| 2| 
| b|  B| 3| 
| b|  C| 4| 
| c|  B| 5| 
| c|  C| 6| 
+---+--------+-----+ 

注意:为了与传统Python版本一起使用,请删除类型注释。

3

在我搜索spark for scala的实现中遇到了这个问题。张贴我的斯卡拉港口,以防有人绊倒这一点。

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame} 
/** Extends the [[org.apache.spark.sql.DataFrame]] class 
* 
* @param df the data frame to melt 
*/ 
implicit class DataFrameFunctions(df: DataFrame) { 

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format. 
    * 
    * melt is (kind of) the inverse of pivot 
    * melt is currently (02/2017) not implemented in spark 
    * 
    * @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html) 
    * @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark 
    * 
    * @todo method overloading for simple calling 
    * 
    * @param id_vars the columns to preserve 
    * @param value_vars the columns to melt 
    * @param var_name the name for the column holding the melted columns names 
    * @param value_name the name for the column holding the values of the melted columns 
    * 
    */ 

    def melt(
      id_vars: Seq[String], value_vars: Seq[String], 
      var_name: String = "variable", value_name: String = "value") : DataFrame = { 

     // Create array<struct<variable: str, value: ...>> 
     val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*) 

     // Add to the DataFrame and explode 
     val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

     val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }} 

     return _tmp.select(cols: _*) 

    } 
} 

由于我不是那种先进的考虑斯卡拉,我相信有改进的余地。欢迎任何评论。