2015-08-20 70 views
4

我想通过组(School_ID,超过3千)在使用Spark Scala API的大型模型输入csv文件上构建随机森林模型。每个组包含大约3000-4000条记录。我拥有的资源是20-30 aws m3.2xlarge的实例。运行3000+随机森林模型使用Spark MLlib Scala API

在R,我可以按组构建模型,并将其保存到列表像这个 -

library(dplyr);library(randomForest); 
    Rf_model <- train %>% group_by(School_ID) %>% 
       do(school= randomForest(formula=Rf_formula, data=., importance = TRUE)) 

列表可以某处被保存,我可以给他们打电话,当我需要使用它们像下面 -

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat")) 
load(file=paste0(Modelpath,"Rf_model.dat")) 
pred <- predict(Rf_model.school$school[school_index][[1]], newdata=test) 

我想知道如何在Spark中做到这一点,无论我是否需要首先按组拆分数据,以及如何在需要时有效地做到这一点。

我能够通过School_ID根据下面的代码拆分文件,但它似乎创建了一个单独的作业,为每个迭代子集并花费很长时间才能完成作业。有一种方法可以一次完成吗?

model_input.cache() 

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq) 
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID)) 

for(i <- 0 to programs.length - 1){ 
    bySchoolArray(i). 
    write.format("com.databricks.spark.csv"). 
    option("header", "true"). 
    save("model_input_bySchool/model_input_"+ schools(i)) 
} 

来源: How can I split a dataframe into dataframes with same column values in SCALA and SPARK

编辑2015年8月24日 我想我的数据帧转换成由随机森林模型接受的格式。我遵循这个线程的指令 How to create correct data frame for classification in Spark ML

基本上,我创建了一个新的变量“标签”,并将我的类存储在Double中。然后,我结合使用VectorAssembler功能我所有的功能和改变我输入的数据如下 -

val assembler = new VectorAssembler(). 
    setInputCols(Array("COL1", "COL2", "COL3")). 
    setOutputCol("features") 

val model_input = assembler.transform(model_input_raw). 
    select("SCHOOL_ID", "label", "features") 

部分错误消息(让我知道如果你需要完整的日志消息) -

scala.MatchError: StringType (of class org.apache.spark.sql.types.StringType$) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:57)

这是解决将所有变量转换为数字类型后。

编辑2015年8月25日 的毫升模式不接受我手工编码,所以我需要使用StringIndexer去解决问题指示here标签。根据official documentation,最频繁的标签得到0.它会导致School_ID之间的标签不一致。我想知道是否有一种方法可以在不重置值的顺序的情况下创建标签。

val indexer = new StringIndexer(). 
    setInputCol("label_orig"). 
    setOutputCol("label") 

任何建议或指示将是有益的,随时提出任何问题。谢谢!

回答

4

既然你已经有了每个学校的单独的数据框架,这里没有太多的工作要做。由于你的数据帧我假设你想使用ml.classification.RandomForestClassifier。如果是这样,你可以尝试这样的事情:

  1. 提取流水线逻辑。根据您的要求在每个子集

    import org.apache.spark.sql.DataFrame 
    import org.apache.spark.ml.classification.RandomForestClassifier 
    import org.apache.spark.ml.{Pipeline, PipelineModel} 
    
    def trainModel(df: DataFrame): PipelineModel = { 
        val rf = new RandomForestClassifier() 
        val pipeline = new Pipeline().setStages(Array(rf)) 
        pipeline.fit(df) 
    } 
    
  2. 火车模型

    val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df)) 
    
  3. 保存模型

    import java.io._ 
    
    def saveModel(name: String, model: PipelineModel) = { 
        val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name")) 
        oos.writeObject(model) 
        oos.close 
    } 
    
    schools.zip(bySchoolArrayModels).foreach{ 
        case (name, model) => saveModel(name, Model) 
    } 
    
  4. 可选调整RandomForestClassifier参数和变压器:由于个别子相当小你可以尝试一种类似于我的方法描述here要同时提交多个任务。

  5. 如果您使用mllib.tree.model.RandomForestModel,则可以省略3.并直接使用model.save。由于反序列化似乎存在一些问题(How to deserialize Pipeline model in spark.ml? - 据我所知,我认为它可以很好但安全得比对不起,我猜),这可能是一个首选方法。

编辑

根据the official documentation

VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type.

由于错误表明你的列是一个String你首先应该改造它,例如使用StringIndexer

+0

谢谢!这很有帮助!我试图通过创建一个类型为Double的变量“label”并使用VectorAssembler函数结合这些特性来将我的数据框转换为随机森林模型所接受的格式。但是,scala控制台指示MatchError:StringType。你知道我在这里做错了吗?我在帖子中提供了更详细的信息。 –

+0

请参阅编辑。 – zero323

+0

再次感谢!将所有变量转换为数字类型后,错误消失。 –