1

我有这些列 ID,价格,时间戳数据帧。pyspark approxQuantile功能

我想找到我使用此代码来找到它,但它给我这个错误的“身份证”

分组中值。

from pyspark.sql import DataFrameStatFunctions as statFunc 
windowSpec = Window.partitionBy("id") 
median = statFunc.approxQuantile("price", 
           [0.5], 
           0) \ 
       .over(windowSpec) 

return df.withColumn("Median", median) 

是没可能使用DataFrameStatFunctions在新列填充值?

TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead) 

回答

13

嗯,的确是可以使用approxQuantile在一个新的数据框栏填入值,但这不是为什么你收到此错误。不幸的是,整个故事的下方是一个相当令人沮丧,之一,I have argued即与许多火花(尤其是PySpark)的特点和他们缺乏足够的文档的情况下。

首先,没有一个,但是两个approxQuantile方法;该first one是标准的数据框类的一部分,即你不需要导入DataFrameStatFunctions:

spark.version 
# u'2.1.1' 

sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)] 

df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"]) 
df.show() 
# +------+---------+------+ 
# | Name|  Role|Salary| 
# +------+---------+------+ 
# | bob|Developer|125000| 
# | mark|Developer|108000| 
# | carl| Tester| 70000| 
# | peter|Developer|185000| 
# | jon| Tester| 65000| 
# | roman| Tester| 82000| 
# | simon|Developer| 98000| 
# | eric|Developer|144000| 
# |carlos| Tester| 75000| 
# | henry|Developer|110000| 
# +------+---------+------+ 

med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions 
med 
# [98000.0] 

The second one是DataFrameStatFunctions的一部分,但如果你把它当作你做什么,你得到你报告错误:

from pyspark.sql import DataFrameStatFunctions as statFunc 
med2 = statFunc.approxQuantile("Salary", [0.5], 0.25) 
# TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead) 

,因为正确的用法是

med2 = statFunc(df).approxQuantile("Salary", [0.5], 0.25) 
med2 
# [82000.0] 

虽然你不会能够找到的有关这个PySpark文档中一个简单的例子(我花了一些时间来找出自己)......最精彩的部分?这两个值不等于

med == med2 
# False 

我怀疑这是由于所使用的非确定性算法(毕竟,它应该是一个近似中位数),即使你重新用相同的玩具数据运行的命令,你可能会得到不同的值(从那些不同我到这里报到) - 我建议尝试一点点地得到的感觉...

但是,正如我已经说过了,这是不为什么不能使用approxQuantile填补值在新数据帧列的原因 - 即使你使用正确的语法,你会得到一个不同的错误:

df2 = df.withColumn('median_salary', statFunc(df).approxQuantile("Salary", [0.5], 0.25)) 
# AssertionError: col should be Column 

这里,colwithColumn操作,即第二个参数在approxQuantile之一,该错误消息说,这是不是一个Column型 - 事实上,它是一个列表:

type(statFunc(df).approxQuantile("Salary", [0.5], 0.25)) 
# list 

因此,填充列的值时,星火预计Column类型的参数,你不能用链表;这里是创建每个角色的平均值,而不是平均的人一个新列的例子:

import pyspark.sql.functions as func 
from pyspark.sql import Window 

windowSpec = Window.partitionBy(df['Role']) 
df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec)) 
df2.show() 
# +------+---------+------+------------------+ 
# | Name|  Role|Salary|  mean_salary| 
# +------+---------+------+------------------+ 
# | carl| Tester| 70000|   73000.0| 
# | jon| Tester| 65000|   73000.0| 
# | roman| Tester| 82000|   73000.0| 
# |carlos| Tester| 75000|   73000.0| 
# | bob|Developer|125000|128333.33333333333| 
# | mark|Developer|108000|128333.33333333333| 
# | peter|Developer|185000|128333.33333333333| 
# | simon|Developer| 98000|128333.33333333333| 
# | eric|Developer|144000|128333.33333333333| 
# | henry|Developer|110000|128333.33333333333| 
# +------+---------+------+------------------+ 

这工作,因为,违背approxQuantilemean返回Column

type(func.mean(df['Salary']).over(windowSpec)) 
# pyspark.sql.column.Column