2017-06-13 40 views
0

我有一个包含以下数据框:枢转数据框 - 星火SQL

TradeId|Source 
ABC|"USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602" 

我要那么它变成下面

TradeId|CCY|PV 
ABC|USD|333.123 
ABC|USD|-789.444 
ABC|GBP|1234.567 

转动这个数据CCY数量|光伏|日期 “来源”列中的三元组不是固定的。我可以在ArrayList中做到这一点,但这需要在JVM中加载数据并击败Spark的整个点。

比方说我的数据框看起来如下:

DataFrame tradesSnap = this.loadTradesSnap(reportRequest); 
String tempTable = getTempTableName(); 
tradesSnap.registerTempTable(tempTable); 
tradesSnap = tradesSnap.sqlContext().sql("SELECT TradeId, Source FROM " + tempTable); 

回答

1

如果你读databricks pivot,它说:" A pivot is an aggregation where one (or more in the general case) of the grouping columns has its distinct values transposed into individual columns."这是不是你想要什么,我猜

我会建议你使用withColumnfunctions得到你想要的最终输出。你可以做如下考虑dataframe是你必须

+-------+----------------------------------------------------------------+ 
|TradeId|Source               | 
+-------+----------------------------------------------------------------+ 
|ABC |USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602| 
+-------+----------------------------------------------------------------+ 

你可以做什么以下使用explodesplitwithColumn以获得所需的输出

val explodedDF = dataframe.withColumn("Source", explode(split(col("Source"), "\\|"))) 
val finalDF = explodedDF.withColumn("CCY", split($"Source", ",")(0)) 
    .withColumn("PV", split($"Source", ",")(1)) 
    .withColumn("Date", split($"Source", ",")(2)) 
    .drop("Source") 

finalDF.show(false) 

最终输出是

+-------+---+--------+--------+ 
|TradeId|CCY|PV  |Date | 
+-------+---+--------+--------+ 
|ABC |USD|333.123 |20170605| 
|ABC |USD|-789.444|20170605| 
|ABC |GBP|1234.567|20150602| 
+-------+---+--------+--------+ 

我希望这可以解决您的问题

+0

是的,我最终也找到了这个选项。它更易于理解,甚至可以将其添加到我的初始选择查询中。 – Achilles

+0

很高兴听到@Archilles。并感谢您的接受和upvote :) –

2

而不是旋转,你正在努力实现看起来更像flatMap什么。

简单地说,通过对你适用于各行的函数(map)本身会产生行序列中的Dataset使用flatMap。然后将每组行连接成单个序列(flat)。

下面的程序给出了这个概念:

import org.apache.spark.sql.SparkSession 

case class Input(TradeId: String, Source: String) 

case class Output(TradeId: String, CCY: String, PV: String, Date: String) 

object FlatMapExample { 

    // This function will produce more rows of output for each line of input 
    def splitSource(in: Input): Seq[Output] = 
    in.Source.split("\\|", -1).map { 
     source => 
     println(source) 
     val Array(ccy, pv, date) = source.split(",", -1) 
     Output(in.TradeId, ccy, pv, date) 
    } 

    def main(args: Array[String]): Unit = { 

    // Initialization and loading 
    val spark = SparkSession.builder().master("local").appName("pivoting-example").getOrCreate() 
    import spark.implicits._ 
    val input = spark.read.options(Map("sep" -> "|", "header" -> "true")).csv(args(0)).as[Input] 

    // For each line in the input, split the source and then 
    // concatenate each "sub-sequence" in a single `Dataset` 
    input.flatMap(splitSource).show 
    } 

} 

鉴于你的输入,这将是输出:

+-------+---+--------+--------+ 
|TradeId|CCY|  PV| Date| 
+-------+---+--------+--------+ 
| ABC|USD| 333.123|20170605| 
| ABC|USD|-789.444|20170605| 
| ABC|GBP|1234.567|20150602| 
+-------+---+--------+--------+ 

现在,您可以采取的结果,它,保存到CSV如果您想。