2017-07-07 101 views
2

我从它看起来像一个SQL源的数据帧:根据列值对火花数据帧进行分区?

User(id: Long, fname: String, lname: String, country: String) 

[1, Fname1, Lname1, Belarus] 
[2, Fname2, Lname2, Belgium] 
[3, Fname3, Lname3, Austria] 
[4, Fname4, Lname4, Australia] 

我想分区和写数据到CSV文件,其中每个分区是基于该国的首字母,所以白俄罗斯和比利时应一个在输出文件,奥地利和澳大利亚在其他。

回答

2

这里是你可以做什么

import org.apache.spark.sql.functions._ 
//create a dataframe with demo data 
val df = spark.sparkContext.parallelize(Seq(
    (1, "Fname1", "Lname1", "Belarus"), 
    (2, "Fname2", "Lname2", "Belgium"), 
    (3, "Fname3", "Lname3", "Austria"), 
    (4, "Fname4", "Lname4", "Australia") 
)).toDF("id", "fname","lname", "country") 

//create a new column with the first letter of column 
val result = df.withColumn("countryFirst", split($"country", "")(0)) 

//save the data with partitionby first letter of country 

result.write.partitionBy("countryFirst").format("com.databricks.spark.csv").save("outputpath") 

编辑: 您还可以使用可以提高性能通过Raphel的建议作为

substring(Column str, int pos, int len)子字符串开始的子字符串在str时,长度为len的是 ; str是字符串类型或返回字节片段 阵列,在字节开始于POS,其长度为LEN的时候str是 二进制类型

val result = df.withColumn("firstCountry", substring($"country",1,1)) 

,然后用写

希望使用partitionby这个解决您的问题!

+0

除了这个问题之外,df.withColumn是否会影响性能,或者是否可以以更有效的方式完成? – jdk2588

+1

你也可以使用spark的'substring'函数代替'split',我认为这样更具可读性 –

+0

我们可以用多列来做到这一点吗? – user482963

0

解决此问题的一种替代方法是首先创建一个只包含每个国家的首字母的列。完成此步骤后,您可以使用partitionBy将每个分区保存为单独的文件。

dataFrame.write.partitionBy("column").format("com.databricks.spark.csv").save("/path/to/dir/") 
+0

这将在列值上创建分区,因此我们将为单独的文件在白俄罗斯和比利时不在一个文件中。 – jdk2588

+0

是的,正如我所提到的,您需要先创建一个包含国家第一个字母的单独列。然后在该列上使用'partitionBy' – Shaido