2017-05-04 58 views
0

我正在使用Spark 2.0创建一个RandomForestClassifier来解决多类分类问题。我能够成功地训练模型,并使用model.save()方法将训练好的模型保存到S3存储桶。然而,当使用load()加载这个模型时,我收到以下错误。加载持久CrossValidatorModel抛出“参数numTrees不存在”错误

`

Exception in thread "main" java.util.NoSuchElementException: Param numTrees does not exist. 
    at org.apache.spark.ml.param.Params$$anonfun$getParam$2.apply(params.scala:609) 
    at org.apache.spark.ml.param.Params$$anonfun$getParam$2.apply(params.scala:609) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.ml.param.Params$class.getParam(params.scala:608) 
    at org.apache.spark.ml.PipelineStage.getParam(Pipeline.scala:42) 
    at org.apache.spark.ml.util.DefaultParamsReader$$anonfun$getAndSetParams$1.apply(ReadWrite.scala:430) 
at org.apache.spark.ml.util.DefaultParamsReader$$anonfun$getAndSetParams$1.apply(ReadWrite.scala:429) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.ml.util.DefaultParamsReader$.getAndSetParams(ReadWrite.scala:429) 
    at org.apache.spark.ml.classification.RandomForestClassificationModel$RandomForestClassificationModelReader.load(RandomForestClassifier.scala:310) 
at org.apache.spark.ml.classification.RandomForestClassificationModel$RandomForestClassificationModelReader.load(RandomForestClassifier.scala:284) 
    at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:447) 
    at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:267) 
o.a.p.h.InternalParquetRecordReader   at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:265) 
: at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
block read in memory in 4226 ms. row count = 52598 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
2017-05-04 21:53:08.140 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:265) 
at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:341) 
    at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:335) 
--- at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:447) 
    at org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelReader.load(CrossValidator.scala:269) 
at org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelReader.load(CrossValidator.scala:256) 
at org.apache.spark.ml.util.MLReadable$class.load(ReadWrite.scala:227) 
    at org.apache.spark.ml.tuning.CrossValidatorModel$.load(CrossValidator.scala:240) 
: at org.apache.spark.ml.tuning.CrossValidatorModel.load(CrossValidator.scala) 

`

下面是我用来训练和保存

val assembler = new VectorAssembler(); 
assembler.setInputCols(inputColumnNames); 
assembler.setOutputCol("Inputs_Indexed"); 


//split 70:30 training and test data 
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3)) 

//train using RandomForest Model 
val rf = new RandomForestClassifier() 
     .setLabelCol("Facing_Indexed") 
     .setFeaturesCol("Inputs_Indexed") 
     .setNumTrees(500); 

val labelConverter = new IndexToString() 
       .setInputCol("prediction") 
       .setOutputCol("predictedLabel") 
       .setLabels(labelIndexer.labels); 

val stageList = new ArrayList[PipelineStage]; 
stageList.addAll(categoricalInputModels); 
stageList.add(labelIndexer); 
stageList.add(assembler); 
stageList.add(rf); 
stageList.add(labelConverter); 

val stages= new Array[PipelineStage](stageList.size); 

//convert stages list to array 
stageList.toArray(stages); 

val pipeline = new Pipeline().setStages(stages) 

val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth, Array(3, 5, 8)).build() 

val evaluator = new MulticlassClassificationEvaluator() 
    .setLabelCol("Facing_Indexed") 
    .setPredictionCol("prediction") 
    .setMetricName("accuracy") 

val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid) 

val model = cv.fit(trainingData) 

val predictions = model.transform(testData); 

predictions.select("predictedLabel", "Facing", "Inputs_Indexed").show(5); 

val accuracy = evaluator.evaluate(predictions) 
    println("Test Error = " + (1.0 - accuracy)) 

model.save("s3n://xyz_path/au.model") 

经过训练的模型保存我使用CrossValidatorModel.load模型的代码片段(“s3n://xyz_path/au.model”)在引发上述错误的单独Java程序中加载模型。在我的S3存储桶中,我可以看到保存的序列化模型。我不确定它出错的地方。任何有关这个错误的帮助表示赞赏。

回答

0

我想清楚是什么问题。 AWS EMR集群正在运行Spark 2.1.0,我正在使用它进行培训并将我的模型保存到S3存储桶。但是在我的Java程序中,我指向了Spark MLLib的2.0.0版本。我发现,有相关人士向“numTrees”重大更改帕拉姆在RandomForestClassifierModel报道的2.0到2.1的迁移指南这里http://spark.apache.org/docs/latest/ml-guide.html#from-20-to-21

所以,最后我更新我的Java项目星火MLLib Maven的依赖到2.1版本0.0。

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-mllib_2.11</artifactId> 
    <version>2.1.0</version> 
</dependency> 

然后,它抱怨额外的缺失类

java.lang.NoClassDefFoundError: org/codehaus/commons/compiler/UncheckedCompileException 

当我加入了公共编译依赖它得到固定

<dependency> 
    <groupId>org.codehaus.janino</groupId> 
    <artifactId>commons-compiler</artifactId> 
    <version>2.7.8</version> 
</dependency> 

而这就是我的持久化模型是如何最终成功加载!