2017-02-21 68 views
0

此代码是我编辑的“汇总统计”https://spark.apache.org/docs/latest/mllib-statistics.html。具体来说,我过滤字符串的组合和使用pDataI = mat.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])pyspark过滤代码的性能分析

打破下来浮动值:

mat.zipWithIndex() is O(sizeOf mat) 
filter(lambda x: x[1] > 0) is O(sizeOf mat) 
map(lambda x: x[0]) is O(sizeOf mat) 

我似乎执行了很多的计算,只是从 集合中删除的第一行。

整个代码:

import numpy as np 
from pyspark.mllib.stat import Statistics 

data = [] 

data.append('c1,c2,c3') 
data.append(np.array([1.0, 10.0, 100.0])) 
data.append(np.array([2.0, 20.0, 200.0])) 
data.append(np.array([3.0, 30.0, 300.0])) 

mat = sc.parallelize(
    data 
) # an RDD of Vectors 

pDataI = mat.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0]) 
summary = Statistics.colStats(pDataI) 

print(summary.mean()) # a dense vector containing the mean value for each column 
print(summary.variance()) # column-wise variance 
print(summary.numNonzeros()) # number of nonzeros in each column 

print pDataI.stats(); 

打印:

[ 2. 20. 200.] 

[ 1.00000000e+00 1.00000000e+02 1.00000000e+04] 

[ 3. 3. 3.] 

(count: 3, mean: [ 2. 20. 200.], stdev: [ 0.81649658 8.16496581 81.64965809], max: [ 3. 30. 300.], min: [ 1. 10. 100.]) 

这能在刚刚处理mat集合在单次改进?

+0

为什么不直接使用'pyspark.DataFrame'? – Grr

+0

@Grr如何将mat转换为pyspark.DataFrame? –

回答

0

对于我的钱DataFrames始终是RDD的更好解决方案。我将假设在一个真实的工作环境中,你的数据存储在某种文件或者集合中,或者任何其他的东西(csv,parquet,json等)。现在假设一个csv。在这种情况下,你只需做到以下几点:

df = sqlContext.read.csv('filename.csv', header=True) 

从那里你可以得到df.describe()汇总统计。

Here对于DataFrameReader包括所有的文件类型和集合,你可以在阅读文档。