2016-12-17 79 views
0

我是新来的spark.I在转换RDD的特定领域有疑问。 我有一个文件,如下图所示:转变RDD的特定领域

2016-11-10T07:01:37|AAA|S16.12|MN-MN/AAA-329044|288364|2|3 
2016-11-10T07:01:37|BBB|S16.12|MN-MN/AAA-329044/BBB-1|304660|0|0 
2016-11-10T07:01:37|TSB|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1|332164|NA|NA 
2016-11-10T07:01:37|RX|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1/RX-1|357181|0|1 

而且我想输出中象下面这样:在第三场我要删除所有字符,由分隔的整数|。

2016-11-10T07:01:37|AAA|16.12|329044|288364|2|3 
2016-11-10T07:01:37|BBB|16.12|329044|1|304660|0|0 
2016-11-10T07:01:37|TSB|16.12|329044|1|1|332164|NA|NA 
2016-11-10T07:01:37|RX|16.12|329044|1|1|1|357181|0|1 

我该怎么做。 我试过下面的代码。

val inputRdd =sc.textFile("file:///home/arun/Desktop/inputcsv.txt"); 
val result =inputRdd.flatMap(line=>line.split("\\|")).collect; 
def ghi(arr:Array[String]):Array[String]= 
{ 
var outlist=scala.collection.mutable.Buffer[String](); 
for(i <-0 to arr.length-1){ 
if(arr(i).matches("(.*)-(.*)")){ 
var io=arr(i); var arru=scala.collection.mutable.Buffer[String](); 
if(io.contains("/")) 
{ 
var ki=io.split("/"); 
for(st <-0 to ki.length-1) 
{ 
var ion =ki(st).split("-"); 
arru+=ion(1); 
} 
var strui=""; 
for(in <-0 to arru.length-1) 
{ 
strui=strui+arru(in)+"|"; 
} 
outlist+=strui; 
} 
else 
{   
var ion =arr(i).split("-"); 
outlist+=ion(1)+"|"; 
} 
} 
else 
{ 
outlist+=arr(i); 
} 
} 
return outlist.toArray; 
} 
var output=ghi(result); 
val finalrdd=sc.parallelize(out, 1); 
finalrdd.collect().foreach(println); 

请帮帮我。

+2

什么是你的疑问?有什么不工作?你试过什么了? – maasg

+0

如果你把第三个字段,例如(MN-MN/AAA-329044/BBB-1),我想把这个字段转换为329044 | 1.我想删除所有字符。 – Khumar

+0

添加您尝试过的代码。 – mrsrinivas

回答

0

我们需要做的是从该字段中提取数字,并将它们作为新条目添加到正在处理的Array中。

像这样的东西应该做的:

// use data provided as sample 
val dataSample ="""2016-11-10T07:01:37|AAA|S16.12|MN-MN/AAA-329044|288364|2|3 
2016-11-10T07:01:37|BBB|S16.12|MN-MN/AAA-329044/BBB-1|304660|0|0 
2016-11-10T07:01:37|TSB|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1|332164|NA|NA 
2016-11-10T07:01:37|RX|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1/RX-1|357181|0|1""".split('\n') 

val data = sparkContext.parallelize(dataSample) 

val records= data.map(line=> line.split("\\|")) 

// this regex can find and extract the contiguous digits in a mixed string. 
val numberExtractor = "\\d+".r.unanchored 

// we replace field#3 with the results of the regex 
val field3Exploded = records.map{arr => arr.take(3) ++ numberExtractor.findAllIn(arr.drop(3).head) ++ arr.drop(4)} 

// Let's visualize the result 
field3Exploded.collect.foreach(arr=> println(arr.mkString(","))) 

2016-11-10T07:01:37,AAA,S16.12,329044,288364,2,3 
2016-11-10T07:01:37,BBB,S16.12,329044,1,304660,0,0 
2016-11-10T07:01:37,TSB,S16.12,329044,1,1,332164,NA,NA 
2016-11-10T07:01:37,RX,S16.12,329044,1,1,1,357181,0,1 
+0

也可以将其视为Spark笔记本:https://gist.github.com/maasg/28109dc3ce14b894bde3dab40a42ab4b – maasg

+0

如何将此结果存储到RDD中,以便我可以将该RDD用于其他转换。 – Khumar

+0

@khumar你可以对这个中间结果进行进一步的转换,或者使用其中一个另存为...功能。查看文档:http://spark.apache.org/documentation.html – maasg