嗯,的确是不可以使用approxQuantile
在一个新的数据框栏填入值,但这不是为什么你收到此错误。不幸的是,整个故事的下方是一个相当令人沮丧,之一,I have argued即与许多火花(尤其是PySpark)的特点和他们缺乏足够的文档的情况下。
首先,没有一个,但是两个approxQuantile
方法;该first one是标准的数据框类的一部分,即你不需要导入DataFrameStatFunctions:
spark.version
# u'2.1.1'
sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]
df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
df.show()
# +------+---------+------+
# | Name| Role|Salary|
# +------+---------+------+
# | bob|Developer|125000|
# | mark|Developer|108000|
# | carl| Tester| 70000|
# | peter|Developer|185000|
# | jon| Tester| 65000|
# | roman| Tester| 82000|
# | simon|Developer| 98000|
# | eric|Developer|144000|
# |carlos| Tester| 75000|
# | henry|Developer|110000|
# +------+---------+------+
med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
med
# [98000.0]
The second one是DataFrameStatFunctions的一部分,但如果你把它当作你做什么,你得到你报告错误:
from pyspark.sql import DataFrameStatFunctions as statFunc
med2 = statFunc.approxQuantile("Salary", [0.5], 0.25)
# TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
,因为正确的用法是
med2 = statFunc(df).approxQuantile("Salary", [0.5], 0.25)
med2
# [82000.0]
虽然你不会能够找到的有关这个PySpark文档中一个简单的例子(我花了一些时间来找出自己)......最精彩的部分?这两个值不等于:
med == med2
# False
我怀疑这是由于所使用的非确定性算法(毕竟,它应该是一个近似中位数),即使你重新用相同的玩具数据运行的命令,你可能会得到不同的值(从那些不同我到这里报到) - 我建议尝试一点点地得到的感觉...
但是,正如我已经说过了,这是不为什么不能使用approxQuantile
填补值在新数据帧列的原因 - 即使你使用正确的语法,你会得到一个不同的错误:
df2 = df.withColumn('median_salary', statFunc(df).approxQuantile("Salary", [0.5], 0.25))
# AssertionError: col should be Column
这里,col
指withColumn
操作,即第二个参数在approxQuantile
之一,该错误消息说,这是不是一个Column
型 - 事实上,它是一个列表:
type(statFunc(df).approxQuantile("Salary", [0.5], 0.25))
# list
因此,填充列的值时,星火预计Column
类型的参数,你不能用链表;这里是创建每个角色的平均值,而不是平均的人一个新列的例子:
import pyspark.sql.functions as func
from pyspark.sql import Window
windowSpec = Window.partitionBy(df['Role'])
df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
df2.show()
# +------+---------+------+------------------+
# | Name| Role|Salary| mean_salary|
# +------+---------+------+------------------+
# | carl| Tester| 70000| 73000.0|
# | jon| Tester| 65000| 73000.0|
# | roman| Tester| 82000| 73000.0|
# |carlos| Tester| 75000| 73000.0|
# | bob|Developer|125000|128333.33333333333|
# | mark|Developer|108000|128333.33333333333|
# | peter|Developer|185000|128333.33333333333|
# | simon|Developer| 98000|128333.33333333333|
# | eric|Developer|144000|128333.33333333333|
# | henry|Developer|110000|128333.33333333333|
# +------+---------+------+------------------+
这工作,因为,违背approxQuantile
,mean
返回Column
:
type(func.mean(df['Salary']).over(windowSpec))
# pyspark.sql.column.Column