这是你可以做的,这里是一个简单的例子
import spark.implicits._
val data = spark.sparkContext.parallelize(Seq(
(29,"City 2", 72),
(28,"City 3", 48),
(28,"City 2", 19),
(27,"City 2", 16),
(28,"City 1", 84),
(28,"City 4", 72),
(29,"City 4", 39),
(27,"City 3", 42),
(26,"City 3", 68),
(27,"City 1", 89),
(27,"City 4", 104),
(26,"City 2", 19),
(29,"City 3", 27)
)).toDF("week", "city", "sale")
//create a dataframe with dummy data
//get list of cities
val city = data.select("city").distinct.collect().flatMap(_.toSeq)
// get all the columns for each city
//this returns Seq[(Any, Dataframe)] as (cityId, Dataframe)
val result = city.map(c => (c -> data.where(($"city" === c))))
//print all the dataframes
result.foreach(a=>
println(s"Dataframe with ${a._1}")
a._2.show()
})
输出如下所示
数据帧与市1
+----+------+----+
|week| city|sale|
+----+------+----+
| 28|City 1| 84|
| 27|City 1| 89|
+----+------+----+
数据帧与市3
+----+------+----+
|week| city|sale|
+----+------+----+
| 28|City 3| 48|
| 27|City 3| 42|
| 26|City 3| 68|
| 29|City 3| 27|
+----+------+----+
数据帧与市4
+----+------+----+
|week| city|sale|
+----+------+----+
| 28|City 4| 72|
| 29|City 4| 39|
| 27|City 4| 104|
+----+------+----+
数据帧与市2
+----+------+----+
|week| city|sale|
+----+------+----+
| 29|City 2| 72|
| 28|City 2| 19|
| 27|City 2| 16|
| 26|City 2| 19|
+----+------+----+
您还可以使用partitionby
对数据进行分组,并写入到输出
dataframe.write.partitionBy("col").saveAsTable("outputpath")
这造成"col"
希望这有助于进行分组对每一个输出文件!
谢谢 - 这是完美的。建议的副本也回答了我的问题,但我已经接受了您的答案。 – thebigdog
非常感谢接受@ thebigdog :) –