2017-04-21 9 views
3

我面临着如何将多值列(即List[String])拆分为单独的行的问题。如何使用类型化数据集将多值列拆分为单独的行?

初始数据集有以下几种类型:Dataset[(Integer, String, Double, scala.List[String])]

+---+--------------------+-------+--------------------+ 
| id|  text   | value | properties  | 
+---+--------------------+-------+--------------------+ 
| 0|Lorem ipsum dolor...| 1.0|[prp1, prp2, prp3..]| 
| 1|Lorem ipsum dolor...| 2.0|[prp4, prp5, prp6..]| 
| 2|Lorem ipsum dolor...| 3.0|[prp7, prp8, prp9..]| 

将所得的数据集应该有以下几种类型:

Dataset[(Integer, String, Double, String)] 

properties应拆分使得:

+---+--------------------+-------+--------------------+ 
| id|  text   | value | property  | 
+---+--------------------+-------+--------------------+ 
| 0|Lorem ipsum dolor...| 1.0|  prp1  | 
| 0|Lorem ipsum dolor...| 1.0|  prp2  | 
| 0|Lorem ipsum dolor...| 1.0|  prp3  | 
| 1|Lorem ipsum dolor...| 2.0|  prp4  | 
| 1|Lorem ipsum dolor...| 2.0|  prp5  | 
| 1|Lorem ipsum dolor...| 2.0|  prp6  | 

回答

3

explode所常说的,但它会从非类型化数据框中API和使用数据集给,我觉得flatMap运营商可能是更好的选择(见org.apache.spark.sql.Dataset)。

flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

(Scala的特异性)通过首先将函数应用于该数据集的所有元素,然后平坦化的结果返回一个新的数据集。

如下您可以使用它:

val ds = Seq(
    (0, "Lorem ipsum dolor", 1.0, Array("prp1", "prp2", "prp3"))) 
    .toDF("id", "text", "value", "properties") 
    .as[(Integer, String, Double, scala.List[String])] 

scala> ds.flatMap { t => 
    t._4.map { prp => 
    (t._1, t._2, t._3, prp) }}.show 
+---+-----------------+---+----+ 
| _1|    _2| _3| _4| 
+---+-----------------+---+----+ 
| 0|Lorem ipsum dolor|1.0|prp1| 
| 0|Lorem ipsum dolor|1.0|prp2| 
| 0|Lorem ipsum dolor|1.0|prp3| 
+---+-----------------+---+----+ 

// or just using for-comprehension 
for { 
    t <- ds 
    prp <- t._4 
} yield (t._1, t._2, t._3, prp) 
2

您可以使用explode

df.withColumn("property", explode($"property")) 

val df = Seq((1, List("a", "b"))).toDF("A", "B") 
// df: org.apache.spark.sql.DataFrame = [A: int, B: array<string>] 

df.withColumn("B", explode($"B")).show 
+---+---+ 
| A| B| 
+---+---+ 
| 1| a| 
| 1| b| 
+---+---+ 
1

这里有一个办法做到这一点:

val myRDD = sc.parallelize(Array(
    (0, "text0", 1.0, List("prp1", "prp2", "prp3")), 
    (1, "text1", 2.0, List("prp4", "prp5", "prp6")), 
    (2, "text2", 3.0, List("prp7", "prp8", "prp9")) 
)).map{ 
    case (i, t, v, ps) => ((i, t, v), ps) 
}.flatMapValues(x => x).map{ 
    case ((i, t, v), p) => (i, t, v, p) 
} 
+0

哦,不。这是RDD API吗?为什么人们希望在数据集时代这样做? –

+1

我认为RDD和DataSet [有他们的地方](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets .html),尽管在这种情况下我同意直接使用DataSet是一种更好的方法。 –

相关问题