2017-06-15 119 views
0

嗨,大家好,我想将RDD [Vector]和RDD [Int]结合到RDD [Vector] 这就是我所做的,我使用Kmeans来预测集群,想法是添加在前面的每个矢量。这里我就是这样做的通讯员簇合并两种不同类型的RDD

val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate() 
val data = spark.sparkContext.textFile("C:/spark/data/mllib/kmeans_data.txt") 
//Cluster the data into two classes using KMeans 
val numClusters = 2 
val numIterations = 20 
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()//RDD[vector] 
val clusters = KMeans.train(parsedData, numClusters, numIterations) 
val resultatOfprediction=clusters.predict(parsedData)//RDD[int] 
val finalData=parsedData.zip(resultatOfprediction) 
finalData.collect().foreach(println) 

结果是

([0.0,0.0,0.0],0) 
([0.1,0.1,0.1],0) 
([0.2,0.2,0.2],0) 
([9.0,9.0,9.0],1) 
([9.1,9.1,9.1],1) 
([9.2,9.2,9.2],1) 

输出我想

[0.0,0.0,0.0,1.0] 
    [0.1,0.1,0.1,1.0] 
    [0.2,0.2,0.2,1.0] 
    [9.0,9.0,9.0,0.0] 
    [9.1,9.1,9.1,0.0] 
    [9.2,9.2,9.2,0.0] 

的目标是,我要AA最终RDD [载体]保存到一个txt文件中grid.but您提供的结果状态并没有一个RDD [矢量]

回答

2

显示它要得到你想要的,你需要的结果压缩这两个RDD。这里是你如何做到这一点

val parsedData = spark.sparkContext.parallelize(Seq(1.0,1.0,1.0,0.0,0.0,0.0)) 

val resultatOfprediction = spark.sparkContext.parallelize(Seq(
    (0.0,0.0,0.0), 
    (0.1,0.1,0.1), 
    (0.2,0.2,0.2), 
    (9.0,9.0,9.0), 
    (9.1,9.1,9.1), 
    (9.2,9.2,9.2) 
)) 

resultatOfprediction.zip(parsedData) 

因为它返回一个元组,你可以得到结果为

resultatOfprediction.zip(parsedData) 
     .map(t => (t._1._1, t._1._2, t._1._3, t._2)) 

对于动态,你可以做folling为sugested通过@拉胡尔 - Sukla resultatOfprediction.zip(parsedData) .map(t => t._1.productIterator.toList.map(_.asInstanceOf[Double]) :+ t._2)

希望这有助于!

+0

请检查更新感谢 –

+0

检查更新的答案 –

+0

我没有得到一个正确的答案,请你会提供对方的回答感谢 –