2017-07-25 65 views
0

我想使用Spark DataSet加载相当大的数据(比方说),其中的子集数据看起来如下所示。Spark中的关系转换

|age|maritalStatus| name|sex| 
+---+-------------+--------+---+ 
| 35|   M| Joanna| F| 
| 25|   S|Isabelle| F| 
| 19|   S| Andy| M| 
| 70|   M| Robert| M| 
+---+-------------+--------+---+ 

我需要的是有关系的转换,其中一列从另一列(S)获得它的价值。 例如,根据每个人记录的“年龄”&“性别”,我需要将Mr或Ms/Mrs放在每个“name”属性前面。另一个例子是,对于60岁以上的“年龄”的人,我需要将他或她标为老年公民(派生专栏“seniorCitizen”为Y)。

我对转换后的数据最终需要的是如下:

+---+-------------+---------------------------+---+ 
|age|maritalStatus|   name|seniorCitizen|sex| 
+---+-------------+---------------------------+---+ 
| 35|   M| Mrs. Joanna|   N| F| 
| 25|   S| Ms. Isabelle|   N| F| 
| 19|   S|  Mr. Andy|   N| M| 
| 70|   M| Mr. Robert|   Y| M| 
+---+-------------+--------+------------------+---+ 

大多数火花提供转换是相当静态的,而不是dyanmic。例如,如在示例herehere中定义的。

我正在使用Spark Datasets,因为我是从关系数据源加载的,但如果您可能会建议使用普通RDD进行此操作的更好方法,请执行此操作。

+1

你可以使用Dataframes和UDF做到这一点,你可以结合whenconcat,​​3210功能。 –

+0

我想你的名字转换应该取决于婚姻状况而不是年龄,不是吗? –

回答

2

您可以使用withColumn添加一个新列,用于seniorCitizen使用where子句和更新name您可以使用定义的函数(udf)用户如下

import spark.implicits._ 

import org.apache.spark.sql.functions._ 
//create a dummy data 
val df = Seq((35, "M", "Joanna", "F"), 
    (25, "S", "Isabelle", "F"), 
    (19, "S", "Andy", "M"), 
    (70, "M", "Robert", "M") 
).toDF("age", "maritalStatus", "name", "sex") 

// create a udf to update name according to age and sex 
val append = udf((name: String, maritalStatus:String, sex: String) => { 
    if (sex.equalsIgnoreCase("F") && maritalStatus.equalsIgnoreCase("M")) s"Mrs. ${name}" 
    else if (sex.equalsIgnoreCase("F")) s"Ms. ${name}" 
    else s"Mr. ${name}" 
}) 

//add two new columns with using withColumn 
df.withColumn("name", append($"name", $"maritalStatus", $"sex")) 
    .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")).show 

输出:

+---+-------------+------------+---+-------------+ 
|age|maritalStatus|  name|sex|seniorCitizen| 
+---+-------------+------------+---+-------------+ 
| 35|   M| Mrs. Joanna| F|   N| 
| 25|   S|Ms. Isabelle| F|   N| 
| 19|   S| Mr. Andy| M|   N| 
| 70|   M| Mr. Robert| M|   Y| 
+---+-------------+------------+---+-------------+ 

编辑:

这里是输出没有使用UDF

df.withColumn("name", 
    when($"sex" === "F", when($"maritalStatus" === "M", concat(lit("Ms. "), df("name"))).otherwise(concat(lit("Ms. "), df("name")))) 
    .otherwise(concat(lit("Ms. "), df("name")))) 
    .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")) 

希望这有助于!

+0

我希望这有助于:) –

1

Spark functions可以帮助您完成工作。如下面的陈述

val updateName = when(lower($"maritalStatus") === "m" && lower($"sex") === "f", concat(lit("Mrs. "), $"name")) 
         .otherwise(when(lower($"maritalStatus") === "s" && lower($"sex") === "f", concat(lit("Ms. "), $"name")) 
         .otherwise(when(lower($"sex") === "m", concat(lit("Mr. "), $"name")))) 

val updatedDataSet = dataset.withColumn("name", updateName) 
    .withColumn("seniorCitizen", when($"age" > 60, "Y").otherwise("N")) 

updatedDataSet是你需要dataset