2017-10-18 60 views

回答

1

您可以链的条件找到这列等于最大值:

cond = "psf.when" + ".when".join(["(psf.col('" + c + "') == psf.col('max_value'), psf.lit('" + c + "'))" for c in df.columns]) 
import pyspark.sql.functions as psf 
df.withColumn("max_value", psf.greatest(*df.columns))\ 
    .withColumn("MAX", eval(cond))\ 
    .show() 

    +-----+--------+----+-----+---------+--------+ 
    |Alice|Eleonora|Mike|Helen|max_value|  MAX| 
    +-----+--------+----+-----+---------+--------+ 
    | 2|  7| 8| 6|  8| Mike| 
    | 11|  5| 9| 4|  11| Alice| 
    | 6|  15| 12| 3|  15|Eleonora| 
    | 5|  3| 7| 8|  8| Helen| 
    +-----+--------+----+-----+---------+--------+ 

OR:使用在字典的UDF:爆炸和过滤

df.withColumn("max_value", psf.greatest(*df.columns))\ 
    .select("*", psf.posexplode(psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns])))))\ 
    .filter("max_value = value")\ 
    .select(df.columns + [psf.col("key").alias("MAX")])\ 
    .show() 

OR

from pyspark.sql.types import * 
argmax_udf = psf.udf(lambda m: max(m, key=m.get), StringType()) 
df.withColumn("map", psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns]))))\ 
    .withColumn("MAX", argmax_udf("map"))\ 
    .drop("map")\ 
    .show() 

OR:使用UDF使用参数:

from pyspark.sql.types import * 
def argmax(cols, *args): 
    return [c for c, v in zip(cols, args) if v == max(args)][0] 
argmax_udf = lambda cols: psf.udf(lambda *args: argmax(cols, *args), StringType()) 
df.withColumn("MAX", argmax_udf(df.columns)(*df.columns))\ 
    .show() 
相关问题