2017-06-15 70 views
2

我有两个独立的DataFrames每个有我使用mllib变压器在管道处理几个不同的处理阶段。连接两个星火mllib管道一起

我现在想这两个管道连接在一起,从每个DataFrame保持功能(列)。

Scikit学习有FeatureUnion类处理这个,我似乎无法找到mllib的等价物。

我可以在一个流水线取得由其他管道作为属性产生的数据帧的末尾添加一个自定义的变压器级,并在变换方法加入,但似乎凌乱。

+0

它是你正在寻找的联合或联盟吗?两者都可以使用数据框来处理。 – jamborta

+0

@jamborta它是一个连接,但是,我想把它作为一个流水线阶段,以便我可以在整个流水线上进行模式检查 – Anake

回答

3

PipelinePipelineModel是有效PipelineStages,因此可以在单个Pipeline组合。例如有:

from pyspark.ml import Pipeline 
from pyspark.ml.feature import VectorAssembler 

df = spark.createDataFrame([ 
    (1.0, 0, 1, 1, 0), 
    (0.0, 1, 0, 0, 1) 
], ("label", "x1", "x2", "x3", "x4")) 

pipeline1 = Pipeline(stages=[ 
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1") 
]) 

pipeline2 = Pipeline(stages=[ 
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2") 
]) 

你可以结合Pipelines

Pipeline(stages=[ 
    pipeline1, pipeline2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features") 
]).fit(df).transform(df) 
+-----+---+---+---+---+---------+---------+-----------------+ 
|label|x1 |x2 |x3 |x4 |features1|features2|features   | 
+-----+---+---+---+---+---------+---------+-----------------+ 
|1.0 |0 |1 |1 |0 |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]| 
|0.0 |1 |0 |0 |1 |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]| 
+-----+---+---+---+---+---------+---------+-----------------+ 

或预安装PipelineModels

model1 = pipeline1.fit(df) 
model2 = pipeline2.fit(df) 

Pipeline(stages=[ 
    model1, model2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features") 
]).fit(df).transform(df) 
+-----+---+---+---+---+---------+---------+-----------------+ 
|label| x1| x2| x3| x4|features1|features2|   features| 
+-----+---+---+---+---+---------+---------+-----------------+ 
| 1.0| 0| 1| 1| 0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]| 
| 0.0| 1| 0| 0| 1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]| 
+-----+---+---+---+---+---------+---------+-----------------+ 

因此,我建议的方法是加入数据ehand,并且fittransform整个DataFrame