0

TL;博士

我在火花2.10拟合线性回归模型 - 使用StringIndexer后和OneHotEncoder我有个〜44元件具有载体。对于我想要预测的新数据,我如何从新的数据元素创建一个特征向量?火花ML - 创建从新数据元素的特征向量来预测

更多详细

首先,这完全是人为的例子来学习如何做到这一点。使用日志与字段:

"elapsed_time", "api_name", "method", and "status_code" 

我们将创建标签elapsed_time的模型,并用其他的领域我们的功能集。完整的代码将在下面分享。

步骤 - 凝聚

  1. 阅读我们的数据到数据帧
  2. 指数每使用StringIndexer我们的特色与OneHotEncoder
  3. OneHotEncode索引功能与VectorAssembler
  4. 创建我们的特色载体将数据分解为训练和测试集
  5. 拟合模型&预测上测试数据

结果是可怕的,但就像我说的,这是一个人为的锻炼......

我需要学习如何做

如果一个新的日志项排在什么以流媒体应用为例,我将如何从新数据创建一个特征向量并将其传递给predict()?

新的日志条目可能是:

{API_NAME “:”/ sample_api_1/V2" , “方法”: “GET”, “STATUS_CODE”: “200”, “ELAPSED_TIME”:39}

后VectorAssembler

status_code_vector

(14,[0],[1.0]) 

api_name_vector

(27,[0],[1.0]) 

method_vector

(3,[0],[1.0]) 

功能载体

(44,[0,14,41],[1.0,1.0,1.0]) 

乐典

%spark 

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer} 
import org.apache.spark.ml.linalg.Vectors 
import org.apache.spark.ml.regression.LinearRegression 
import org.apache.spark.sql.DataFrame 

val logs = sc.textFile("/Users/z001vmk/data/sample_102M.txt") 
val dfLogsRaw: DataFrame = spark.read.json(logs) 

val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column") 

// Create DF with our fields of concern. 
val dfFeatures: DataFrame = dfLogsFiltered.select("elapsed_time", "api_name", "method", "status_code") 

// Contrived goal: 
// Use elapsed time as our label given features api_name, status_code, & method. 
// Train model on small (100Mb) dataset 
// Be able to predict elapsed_time given a new record similar to this example: 
// --> {api_name":"/sample_api_1/v2","method":"GET","status_code":"200","elapsed_time":39} 

// Indexers 
val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip") 
val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip") 
val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip") 
// Index features: 
val dfIndexed0: DataFrame = statusCodeIdxr.fit(dfFeatures).transform(dfFeatures) 
val dfIndexed1: DataFrame = apiNameIdxr.fit(dfIndexed0).transform(dfIndexed0) 
val indexed: DataFrame = methodIdxr.fit(dfIndexed1).transform(dfIndexed1) 
// OneHotEncoders 
val statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec") 
val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec") 
val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec") 
// Encode feature vectors 
val encoded0: DataFrame = statusCodeEncoder.transform(indexed) 
val encoded1: DataFrame = apiNameEncoder.transform(encoded0) 
val encoded: DataFrame = methodEncoder.transform(encoded1) 
// Limit our dataset to necessary elements: 
val dataset0 = encoded.select("elapsed_time", "status_code_vec", "api_name_vec", "method_vec").withColumnRenamed("elapsed_time", "label") 

// Assemble feature vectors 
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec")).setOutputCol("features") 
val dataset1 = assembler.transform(dataset0) 
dataset1.show(5,false) 
// Prepare the dataset for training (optional): 
val dataset: DataFrame = dataset1.select("label", "features") 
dataset.show(3,false) 

val Array(training, test) = dataset.randomSplit(Array(0.8, 0.2)) 

// Create our Linear Regression Model 
val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features") 
val lrModel = lr.fit(training) 

val predictions = lrModel.transform(test) 
predictions.show(20,false) 

这可如果你有兴趣都可以粘贴到齐柏林笔记本。

结束语

所以,我一直在淘有关的是如何将新数据转换成〜35ish元素特征向量与和使用模型拟合训练数据转换,并得到预测。我怀疑有元数据存在于模型本身中,或者在这种情况下需要从StringIndexers进行维护 - 但这是我无法找到的。

很高兴指出文档或示例 - 所有帮助表示赞赏。

谢谢!

回答

1

在走下使用PipelineModel的道路之后,这变得非常简单。给@tadamhicks的帽子提示让我更早地看看piplines。

下面是一个更新后的代码块,它基本上执行与上述相同的模型创建,拟合和预测,但是这样做使用了管道,并且在新建数据帧时预测新模型如何预测新数据。

可能有更清晰的方式来重新命名/创建我们的标签列,但我们会将其作为未来的增强功能。

%spark 

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer} 
import org.apache.spark.ml.{Pipeline, PipelineModel} 
import org.apache.spark.ml.linalg.Vectors 
import org.apache.spark.ml.regression.LinearRegression 
import org.apache.spark.sql.DataFrame 

val logs = sc.textFile("/data/sample_102M.txt") 
val dfLogsRaw: DataFrame = spark.read.json(logs) 

val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column") 
           .select("elapsed_time", "api_name", "method", "status_code","cache_status") 
           .withColumnRenamed("elapsed_time", "label") 
val Array(training, test) = dfLogsFiltered.randomSplit(Array(0.8, 0.2)) 

// Indexers 
val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip") 
val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip") 
val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")//"cache_status" 
val cacheStatusIdxr: StringIndexer = new StringIndexer().setInputCol("cache_status").setOutputCol("cache_status_idx").setHandleInvalid("skip") 
// OneHotEncoders 
val statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec") 
val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec") 
val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec") 
val cacheStatusEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(cacheStatusIdxr.getOutputCol).setOutputCol("cache_status_vec") 
// Vector Assembler 
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec", "cache_status_vec")).setOutputCol("features") 

val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features") 

val pipeline = new Pipeline().setStages(Array(statusCodeIdxr, apiNameIdxr, methodIdxr, cacheStatusIdxr, statusCodeEncoder, apiNameEncoder, methodEncoder, cacheStatusEncoder, assembler, lr)) 

val plModel: PipelineModel = pipeline.fit(training) 

plModel.write.overwrite().save("/tmp/spark-linear-regression-model") 

plModel.transform(test).select("label", "prediction").show(5,false) 

val dataElement: String = """{"api_name":"/sample_api/v2","method":"GET","status_code":"200","cache_status":"MISS","elapsed_time":39}""" 

val newDataRDD = spark.sparkContext.makeRDD(dataElement :: Nil) 
val newData = spark.read.json(newDataRDD).withColumnRenamed("elapsed_time", "label") 

val loadedPlModel = PipelineModel.load("/tmp/spark-linear-regression-model") 
loadedPlModel.transform(newData).select("label", "prediction").show 
+0

感谢您的问题和答案 - 非常有帮助! –

+0

您有机会测试新数据的预测结果吗?实际上,我想知道如何将StringIndexers设置为.setHandleInvalid(“skip”)时如何处理新数据。我认为它会跳过新的数据,并且不会对它进行索引,因此预测将是错误的 –

1

简短回答:管道模型。

尽管如此,为了确保自己明白,如果不需要,在启动应用程序时不想创建模型。除非你打算使用DataSets和反馈,否则这很愚蠢。在Spark Submit会话中创建你的模型(或者使用像Zeppelin这样的笔记本会话)并保存下来。这是你的数据科学。

大多数DS员工将模型交给他们,让DevOps/Data Engineers使用它。他们所要做的就是在对象加载到内存后调用一个.predict()。

+0

确实。模型(待培训/适合以后)将与Streaming应用打包,以查看批量新数据。无论如何...当你说话的时候,我们说叫Object.predict - 是的,我正在寻找如何旋转所述对象。可以应用用于创建预测给定模型的对象的方法和机制。这是我在我尚未填写的文档中发现缺陷的地方。 – reverend

+0

啊,好吧,看着管道是后来的TODO项目,但在开始解析事情后,我想我开始看到PipelineModel或拟合管道的路径。当我了解更多信息时,我会更新此内容 - 感谢您推动这个方向! – reverend