0
此代码是我编辑的“汇总统计”https://spark.apache.org/docs/latest/mllib-statistics.html。具体来说,我过滤字符串的组合和使用pDataI = mat.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
pyspark过滤代码的性能分析
打破下来浮动值:
mat.zipWithIndex() is O(sizeOf mat)
filter(lambda x: x[1] > 0) is O(sizeOf mat)
map(lambda x: x[0]) is O(sizeOf mat)
我似乎执行了很多的计算,只是从 集合中删除的第一行。
整个代码:
import numpy as np
from pyspark.mllib.stat import Statistics
data = []
data.append('c1,c2,c3')
data.append(np.array([1.0, 10.0, 100.0]))
data.append(np.array([2.0, 20.0, 200.0]))
data.append(np.array([3.0, 30.0, 300.0]))
mat = sc.parallelize(
data
) # an RDD of Vectors
pDataI = mat.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
summary = Statistics.colStats(pDataI)
print(summary.mean()) # a dense vector containing the mean value for each column
print(summary.variance()) # column-wise variance
print(summary.numNonzeros()) # number of nonzeros in each column
print pDataI.stats();
打印:
[ 2. 20. 200.]
[ 1.00000000e+00 1.00000000e+02 1.00000000e+04]
[ 3. 3. 3.]
(count: 3, mean: [ 2. 20. 200.], stdev: [ 0.81649658 8.16496581 81.64965809], max: [ 3. 30. 300.], min: [ 1. 10. 100.])
这能在刚刚处理mat
集合在单次改进?
为什么不直接使用'pyspark.DataFrame'? – Grr
@Grr如何将mat转换为pyspark.DataFrame? –