2016-03-02 235 views
0

我有一个spark.rdd.RDD[String] MapPartition,我用过滤器创建。Spark - Rdd字符串清理/操作

val myMapPartition = myTextFile.filter(_.split("\t")(2) == "\"red\"") 

该过滤器由制表符分割我的文本文件线和检查所得到的阵列的第二元件是否等于“红色”

myMapPartition.collect()返回String类型的Array。这里是一个例子:

24344 "someString" "red" 
23421 "someOtherString" "red" 

我想对字符串执行一些编辑。最终,我在查看一些字符串替换逻辑,但我试图先串联一个字符串。所以我要寻找的是这样的:

24344 "someString hello" "red" 
23421 "someOtherString hello" "red" 

我试图做到这一点使用map

val myCleanRdd = myMapPartition.map(_1 => (_1.concat(" hello"))) 

不过,我结束了:

24344 "someString" "red" hello 
23421 "someOtherString" "red" hello 

我的问题是我如何操纵rdd行的某些元素?我认为问题在于我的排被认为是一个String。我不知道如何正确映射这个,让我专注于个别领域。

免责声明:斯卡拉/星火小白

回答

2

首先,您需要在您的原始RDD的每个元素映射split,所以你最终与RDD[Array[String]]而不是RDD[String],例如

myTextFile.map(_.split("\t")).filter(_(2) == "\"red\"") 

你目前正在使用split过滤字符串你的输入RDD,但是这仅仅是创建输出字符串的RDD,扔掉你对split他们的工作。

然后,如果您的RDD的每个元素是一个已知长度的Array[String],然后就可以map使用图案匹配(使用case关键字)来提取和修改单个元件,例如:

rdd.map { case Array(x, y, z) => Array(x, y + " hello", z) } 

(请注意,使用此方法时,必须使用大括号{}而不是括号map函数的括号())。类似的模式匹配可以对列表,元组,元素等行进行匹配...

更新:如果您想用处理过的版本替换其中一个元素,这是类似的模式。

rdd.map { case Array(x, y, z) => Array(x, y.replace("s","x"), z) } 

要打印出RDD[Array[String]]的所有元素,你可以做一个嵌套foreach,例如

rdd.foreach(_.foreach(println)) 

打印出的每一行作为一个数组是棘手不是因为重载方法预期(一个通常会使用Arrays.toString但这seems to cause type problems在斯卡拉),但可以做如下:

rdd.foreach(row => println(row.mkString("[",",","]"))) 
+0

正是我正在寻找!有两件事情:你可以讨论如何将这个模式应用于像stringReplace功能的东西吗?另外,一旦我得到'Array [String]'的原始红色,我该如何打印?现在我只看到内存地址?谢谢! –

+0

请参阅我的更新回答。 – DNA