2017-04-21 62 views
0
#Load the CSV file into a RDD 
    irisData = sc.textFile("/home/infademo/surya/iris.csv") 
    irisData.cache() 
    irisData.count() 

    #Remove the first line (contains headers) 
    dataLines = irisData.filter(lambda x: "Sepal" not in x) 
    dataLines.count() 

    from pyspark.sql import Row 
    #Create a Data Frame from the data 
    parts = dataLines.map(lambda l: l.split(",")) 
    irisMap = parts.map(lambda p: Row(SEPAL_LENGTH=float(p[0]),\ 
            SEPAL_WIDTH=float(p[1]), \ 
            PETAL_LENGTH=float(p[2]), \ 
            PETAL_WIDTH=float(p[3]), \ 
            SPECIES=p[4])) 

    # Infer the schema, and register the DataFrame as a table. 
    irisDf = sqlContext.createDataFrame(irisMap) 
    irisDf.cache() 

    #Add a numeric indexer for the label/target column 
    from pyspark.ml.feature import StringIndexer 
    stringIndexer = StringIndexer(inputCol="SPECIES", outputCol="IND_SPECIES") 
    si_model = stringIndexer.fit(irisDf) 
    irisNormDf = si_model.transform(irisDf) 

    irisNormDf.select("SPECIES","IND_SPECIES").distinct().collect() 
    irisNormDf.cache() 

    """-------------------------------------------------------------------------- 
    Perform Data Analytics 
    -------------------------------------------------------------------------""" 

    #See standard parameters 
    irisNormDf.describe().show() 

    #Find correlation between predictors and target 
    for i in irisNormDf.columns: 
     if not(isinstance(irisNormDf.select(i).take(1)[0][0], basestring)) : 
      print("Correlation to Species for ", i, \ 
         irisNormDf.stat.corr('IND_SPECIES',i)) 



    #Transform to a Data Frame for input to Machine Learing 
    #Drop columns that are not required (low correlation) 

    from pyspark.mllib.linalg import Vectors 
    from pyspark.mllib.linalg import SparseVector 
    from pyspark.mllib.regression import LabeledPoint 
    from pyspark.mllib.util import MLUtils 
    import org.apache.spark.mllib.linalg.{Matrix, Matrices} 
    from pyspark.mllib.linalg.distributed import RowMatrix 

    from pyspark.ml.linalg import Vectors 
    pyspark.mllib.linalg.Vector 
    def transformToLabeledPoint(row) : 
     lp = (row["SPECIES"], row["IND_SPECIES"], \ 
        Vectors.dense([row["SEPAL_LENGTH"],\ 
          row["SEPAL_WIDTH"], \ 
          row["PETAL_LENGTH"], \ 
          row["PETAL_WIDTH"]])) 
     return lp 




    irisLp = irisNormDf.rdd.map(transformToLabeledPoint) 
    irisLpDf = sqlContext.createDataFrame(irisLp,["species","label", "features"]) 
    irisLpDf.select("species","label","features").show(10) 
    irisLpDf.cache() 

    """-------------------------------------------------------------------------- 
    Perform Machine Learning 
    -------------------------------------------------------------------------""" 
    #Split into training and testing data 
    (trainingData, testData) = irisLpDf.randomSplit([0.9, 0.1]) 
    trainingData.count() 
    testData.count() 
    testData.collect() 

    from pyspark.ml.classification import DecisionTreeClassifier 
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

    #Create the model 
    dtClassifer = DecisionTreeClassifier(maxDepth=2, labelCol="label",\ 
        featuresCol="features") 

    dtModel = dtClassifer.fit(trainingData) 

回溯(最近通话最后一个): 文件 “”,1号线,在 文件“/opt/mapr/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/ ml/pipeline.py“,第69行,适合 return self._fit(dataset) 文件”/opt/mapr/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/ml/wrapper。 py“,第133行,在_fit java_model = self._fit_java(dataset) 文件”/opt/mapr/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/ml/wrapper.py“,第130行,在_fit_java 返回self._java_obj.fit(dataset._jdf) 文件“/opt/mapr/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip /py4j/java_gateway.py“,行813,在电话 文件“/opt/mapr/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.py”,第53行,在deco raise IllegalArgumentException(s.split(' :',1)[1],stackTrace) pyspark.sql.utils.IllegalArgumentException:u'DecisionTreeClassifier被赋予带有无效标签列标签的输入,但没有指定类的数量。请参阅StringIndexer。'u'DecisionTreeClassifier被赋予无效标签列标签的输入,但没有指定类的数量。见StringIndexer

回答

0

据星火1.6.1 document

我们用两个特征变压器准备数据;这些帮助索引 类别的标签和分类特征,将元数据 添加到决策树算法可以识别的DataFrame

据星火1.6.1 source code

val numClasses: Int = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { 
    case Some(n: Int) => n 
    case None => throw new IllegalArgumentException("DecisionTreeClassifier was given input" + 
    s" with invalid label column ${$(labelCol)}, without the number of classes" + 
    " specified. See StringIndexer.") 
    // TODO: Automatically index labels: SPARK-7126 
} 

所以,你需要传递给DecisionTreeClassifier前使用StringIndexerlabel柱和VectorIndexerfeatures列。 fit