TL;博士
我在火花2.10拟合线性回归模型 - 使用StringIndexer后和OneHotEncoder我有个〜44元件具有载体。对于我想要预测的新数据,我如何从新的数据元素创建一个特征向量?火花ML - 创建从新数据元素的特征向量来预测
更多详细
首先,这完全是人为的例子来学习如何做到这一点。使用日志与字段:
"elapsed_time", "api_name", "method", and "status_code"
我们将创建标签elapsed_time
的模型,并用其他的领域我们的功能集。完整的代码将在下面分享。
步骤 - 凝聚
- 阅读我们的数据到数据帧
- 指数每使用StringIndexer我们的特色与OneHotEncoder
- OneHotEncode索引功能与VectorAssembler
- 创建我们的特色载体将数据分解为训练和测试集
- 拟合模型&预测上测试数据
结果是可怕的,但就像我说的,这是一个人为的锻炼......
我需要学习如何做
如果一个新的日志项排在什么以流媒体应用为例,我将如何从新数据创建一个特征向量并将其传递给predict()?
新的日志条目可能是:
{API_NAME “:”/ sample_api_1/V2" , “方法”: “GET”, “STATUS_CODE”: “200”, “ELAPSED_TIME”:39}
后VectorAssembler
status_code_vector
(14,[0],[1.0])
api_name_vector
(27,[0],[1.0])
method_vector
(3,[0],[1.0])
功能载体
(44,[0,14,41],[1.0,1.0,1.0])
乐典
%spark
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.DataFrame
val logs = sc.textFile("/Users/z001vmk/data/sample_102M.txt")
val dfLogsRaw: DataFrame = spark.read.json(logs)
val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column")
// Create DF with our fields of concern.
val dfFeatures: DataFrame = dfLogsFiltered.select("elapsed_time", "api_name", "method", "status_code")
// Contrived goal:
// Use elapsed time as our label given features api_name, status_code, & method.
// Train model on small (100Mb) dataset
// Be able to predict elapsed_time given a new record similar to this example:
// --> {api_name":"/sample_api_1/v2","method":"GET","status_code":"200","elapsed_time":39}
// Indexers
val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip")
val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip")
val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")
// Index features:
val dfIndexed0: DataFrame = statusCodeIdxr.fit(dfFeatures).transform(dfFeatures)
val dfIndexed1: DataFrame = apiNameIdxr.fit(dfIndexed0).transform(dfIndexed0)
val indexed: DataFrame = methodIdxr.fit(dfIndexed1).transform(dfIndexed1)
// OneHotEncoders
val statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec")
val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec")
val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec")
// Encode feature vectors
val encoded0: DataFrame = statusCodeEncoder.transform(indexed)
val encoded1: DataFrame = apiNameEncoder.transform(encoded0)
val encoded: DataFrame = methodEncoder.transform(encoded1)
// Limit our dataset to necessary elements:
val dataset0 = encoded.select("elapsed_time", "status_code_vec", "api_name_vec", "method_vec").withColumnRenamed("elapsed_time", "label")
// Assemble feature vectors
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec")).setOutputCol("features")
val dataset1 = assembler.transform(dataset0)
dataset1.show(5,false)
// Prepare the dataset for training (optional):
val dataset: DataFrame = dataset1.select("label", "features")
dataset.show(3,false)
val Array(training, test) = dataset.randomSplit(Array(0.8, 0.2))
// Create our Linear Regression Model
val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")
val lrModel = lr.fit(training)
val predictions = lrModel.transform(test)
predictions.show(20,false)
这可如果你有兴趣都可以粘贴到齐柏林笔记本。
结束语
所以,我一直在淘有关的是如何将新数据转换成〜35ish元素特征向量与和使用模型拟合训练数据转换,并得到预测。我怀疑有元数据存在于模型本身中,或者在这种情况下需要从StringIndexers进行维护 - 但这是我无法找到的。
很高兴指出文档或示例 - 所有帮助表示赞赏。
谢谢!
感谢您的问题和答案 - 非常有帮助! –
您有机会测试新数据的预测结果吗?实际上,我想知道如何将StringIndexers设置为.setHandleInvalid(“skip”)时如何处理新数据。我认为它会跳过新的数据,并且不会对它进行索引,因此预测将是错误的 –