2017-03-08 84 views
0

我正在使用PCA进行数据分析,我用PySpark编写了这段代码,它完美地工作,但它只适用于从csv文件中读取的数据,只有5列[“a” “b”,“c”,“d”,“e”],我想写一个通用代码来计算从csv文件读取的任意数量的列的PCA。我应该添加什么? 这里是我的代码:Pyspark PCA数据处理的通用模型

#########################! importing libraries !######################## 
from __future__ import print_function 
from pyspark.ml.linalg import Vectors 
from pyspark.sql import SparkSession 
from pyspark import SparkConf, SparkContext 
from pyspark.ml.feature import PCA, VectorAssembler 
from pyspark.mllib.linalg import Vectors 
from pyspark.ml import Pipeline 
from pyspark.sql import SQLContext 
from pyspark import SparkContext 
from pyspark.mllib.feature import Normalizer 
import timeit 
########################! main script !################################# 
sc = SparkContext("local", "pca-app") 
sqlContext = SQLContext(sc) 
if __name__ == "__main__": 
    spark = SparkSession\ 
     .builder\ 
     .appName("PCAExample")\ 
     .getOrCreate() 
    data = sc.textFile('dataset.csv') \ 
     .map(lambda line: [float(k) for k in line.split(';')])\ 
     .collect() 
    df = spark.createDataFrame(data, ["a","b","c","d","e"]) 
    df.show() 
    vecAssembler = VectorAssembler(inputCols=["a","b","c","d","e"], outputCol="features") 

    pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures") 
    pipeline = Pipeline(stages=[vecAssembler, pca] 
    model = pipeline.fit(df) 
    result = model.transform(df).select("pcaFeatures") 
    result.show(truncate=False)) 
    spark.stop() 

回答

1

你需要让你的代码一般通过改变几行字: -

fileObj = sc.textFile('dataset.csv') 
data = fileObj.map(lambda line: [float(k) for k in line.split(';')]).collect() 
columns = (fileObj.first()).split() 
df = spark.createDataFrame(data, columns) 
df.show() 
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features") 
+0

只是缺少“;”在“(fileObj.first())。split()”,它完美的作品:D谢谢 –

+0

@MehdiBenHamida这取决于你的格式(分隔符),我假设空间为分隔符。无论如何, –

+0

,谢谢:D –