2017-09-15 92 views
2

由于数据是这样的:如何根据数据类型过滤数据?

val my_data = sc.parallelize(Array(
    "Key1, foobar, 10, twenty, 20", 
    "Key2, impt, 11, sixty, 6", 
    "Key3, helloworld, 110, seventy, 9")) 

我想过滤并创建一个key,value RDD象下面这样:

key1, foobar 
key1, twenty 
key2, impt 
key2, sixty 
key3, helloworld 
key3, seventy 

我已经试过

我想,我可以只需将数据放在一个表中并让数据类型被推断即可。

//is there a way to avoid writing to file??? 
my_data.coalesce(1).saveAsTextFile("/tmp/mydata.csv") 
val df_mydata = sqlContext.read 
.format("com.databricks.spark.csv") 
.option("inferSchema", "true") 
.load("/tmp/mydata.csv") 

上面的工作,使我有一个正确的数据类型的表。但是,我不知道如何过滤数据类型,然后从中创建键/值对。

我还可以使用Character.isDigit,而不是创建一个模式,但还需要知道如何筛选键/值对解决这将是

回答

0

的一种方式,正如你所说,用一个split一起使用Character.isDigitflatMap。使用您my_data为例:

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

val df = my_data.map(_.split(",").map(_.trim).toList.filterNot(s => s.forall(_.isDigit))) 
    .flatMap{case ((key: String)::tail) => tail.map(t => (key, t))}.toDF("Key", "Value") 
df.show() 

,这将给你这样的事情:

+----+----------+ 
| Key|  Value| 
+----+----------+ 
|Key1| foobar| 
|Key1| twenty| 
|Key2|  impt| 
|Key2|  sixty| 
|Key3|helloworld| 
|Key3| seventy| 
+----+----------+ 

在这里,我也把它转换成数据帧,但是如果你想要一个RDD就可以直接跳过这一步。为了使它工作,每行必须包含一个键,并且该键应该位于字符串中的第一个位置。

希望它有帮助!


编辑:

中使用的命令的击穿。

的第一张地图经过每串在你的RDD,每串应用(按顺序)如下:

.split(",") 
.map(_.trim) 
.toList 
.filterNot(s => s.forall(_.isDigit)) 

让我们用你的第一排为例:"Key1, foobar, 10, twenty, 20"。首先,行被“,”分割,这将给你一串字符串Array("Key1", " foobar", " 10", " twenty", " 20")

接下来是map(_.trim)它将修剪(删除单词之前和之后的空格)数组中的每个元素,该数组也将转换为列表(以后在flatMap中匹配的情况):List("Key1", "foobar", "10", "twenty", "20")

filterNot将删除所有字符都是数字的所有字符串。这里的forall会检查每个角色是否满足这个条件。这将删除列表中的一些元素:List("Key1", "foobar", "twenty")

现在,关键的遗体后,只进行了分组过滤:

flatMap{case ((key: String)::tail) => tail.map(t => (key, t))} 

这里key成为第一个元素的每一行的名单,以下从它变成“KEY1”前行的例子。 tail只是列表中的其余部分。然后,对于不是key值的每个元素,我们用元组(key, value)替换它。换句话说,每个元素(除了第一个元素,即key)都会变成包含key及其本身的元组。这里使用的是flatMap,否则你会得到一个元组列表,而不是所需的元组列表。

最后一个将其转换为使用toDF("Key", "Value")的命名列的数据框,请注意,这需要在开始时使用导入(import spark.implicits._)。

+0

我是新来的斯卡拉和一般的火花。如果可能的话,你能否分解这个链式命令来解释这些步骤的作用?我已确认您的解决方案正常运行,并感谢您的支持! –

+0

@ spark-health-learn当然,我添加了对命令的解释以及它们如何一起工作来接收结果。希望它能帮助你学习:)如果它对你有帮助,请点击复选标记/ upvote接受答案。 – Shaido

+0

这真的很有帮助。 'tail.map(t =>(key,t)''的最后一个命令让我暂时不了解。 –