2015-05-19 34 views
0

我有一个文件象下面这样:提取号码和可变它们存储在Scala和星火

0; best wrap ear market pair pair break make 

1; time sennheiser product better earphone fit 

1; recommend headphone pretty decent full sound earbud design 

0; originally buy work gym work well robust sound quality good clip 

1; terrific sound great fit toss mine profuse sweater headphone 

0; negative experienced sit chair back touch chair earplug displace hurt 
... 

,我想提取数,并将其存储在每个文档,我已经试过:

var grouped_with_wt = data.flatMap({ (line) => 
    val words = line.split(";").split(" ") 
    words.map(w => { 
     val a = 
     (line.hashCode(),(vocab_lookup.value(w), a)) 
    }) 
    }).groupByKey() 

预期输出是:

生成我用他们在这个代码,以生成最终结果上述结果后
(1453543,(best,0),(wrap,0),(ear,0),(market,0),(pair,0),(break,0),(make,0)) 
(3942334,(time,1),(sennheiser,1),(product,1),(better,1),(earphone,1),(fit,1)) 
... 

val Beta = DenseMatrix.zeros[Int](V, S) 
     val Beta_c = grouped_with_wt.flatMap(kv => { 
     kv._2.map(wt => { 
      Beta(wt._1,wt._2) +=1 
     }) 
     }) 

最终结果:

1 0 
    1 0 
    1 0 
    1 0 
    ... 

此代码不能很好地工作,任何人可以帮助我吗?我想要一个类似上面的代码。

回答

2
val inputRDD = sc.textFile("input dir ") 
val outRDD = inputRDD.map(r => { 
    val tuple = r.split(";") 
    val key = tuple(0) 
    val words = tuple(1).trim().split(" ") 
    val outArr = words.map(w => { 
     new Tuple2(w,key) 
    }) 
    (r.hashCode, outArr.mkString(",")) 
}) 
outRDD.saveAsTextFile("output dir") 

输出

(-1704185638,(best,0),(wrap,0),(ear,0),(market,0),(pair,0),(pair,0),(break,0),(make,0)) 
(147969209,(time,5),(sennheiser,5),(product,5),(better,5),(earphone,5),(fit,5)) 
(1145947974,(recommend,1),(headphone,1),(pretty,1),(decent,1),(full,1),(sound,1),(earbud,1),(design,1)) 
(838871770,(originally,4),(buy,4),(work,4),(gym,4),(work,4),(well,4),(robust,4),(sound,4),(quality,4),(good,4),(clip,4)) 
(934228708,(terrific,5),(sound,5),(great,5),(fit,5),(toss,5),(mine,5),(profuse,5),(sweater,5),(headphone,5)) 
(659513416,(negative,-3),(experienced,-3),(sit,-3),(chair,-3),(back,-3),(touch,-3),(chair,-3),(earplug,-3),(displace,-3),(hurt,-3)) 
+0

谢谢您的回复,而不是使用地图flatMap,比如在我的问题的代码是否有可能?因为我想在我的程序中使用这个flatmapped RDD。 – Rozita