2016-11-11 74 views
0

我具有由火花数据帧像下面的输出:劈裂列在将新行[斯卡拉]

金额| ID |数字|开始_ |标识符
43.45 | 19840 | A345 | [2014 -12-26,2013-12-12] | [232323,45466] |
43.45 | 19840 | A345 | [2010-03-16,2013-16-12] | [34343,45454] |

我的要求是从上述输出

金额以下格式以生成输出| ID |数字|开始_ |标识符
43.45 | 19840 | A345 | 2014-12-26 | 232323
43.45 | 19840 | A345 |二〇一三年十二月十二日| 45466
43.45 | 19840 | A345 | 2010-03-16 | 34343
43.45 | 19840 | A345 | 2013-16-12 | 45454

有人可以帮我实现这个。

回答

1

这是你要找的东西吗?

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 

val sparkSession = ... 
import sparkSession.implicits._ 

val input = sc.parallelize(Seq(
    (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq(232323,45466)), 
    (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq(34343,45454)) 
)).toDF("amt", "id", "num", "start_date", "identifier") 

val zipArrays = udf { (dates: Seq[String], identifiers: Seq[Int]) => 
    dates.zip(identifiers) 
} 

val output = input.select($"amt", $"id", $"num", explode(zipArrays($"start_date", $"identifier"))) 
    .select($"amt", $"id", $"num", $"col._1".as("start_date"), $"col._2".as("identifier")) 

output.show() 

将返回:

+-----+-----+----+----------+----------+ 
| amt| id| num|start_date|identifier| 
+-----+-----+----+----------+----------+ 
|43.45|19840|A345|2014-12-26| 232323| 
|43.45|19840|A345|2013-12-12|  45466| 
|43.45|19840|A345|2010-03-16|  34343| 
|43.45|19840|A345|2013-16-12|  45454| 
+-----+-----+----+----------+----------+ 

编辑:

既然你想有应拉上多列,你应该尝试这样的事:

val input = sc.parallelize(Seq(
    (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq("232323","45466"), Seq("123", "234")), 
    (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq("34343","45454"), Seq("345", "456")) 
)).toDF("amt", "id", "num", "start_date", "identifier", "another_column") 

val zipArrays = udf { seqs: Seq[Seq[String]] => 
    for(i <- seqs.head.indices) yield seqs.fold(Seq.empty)((accu, seq) => accu :+ seq(i)) 
} 

val columnsToSelect = Seq($"amt", $"id", $"num") 
val columnsToZip = Seq($"start_date", $"identifier", $"another_column") 
val outputColumns = columnsToSelect ++ columnsToZip.zipWithIndex.map { case (column, index) => 
    $"col".getItem(index).as(column.toString()) 
} 

val output = input.select($"amt", $"id", $"num", explode(zipArrays(array(columnsToZip: _*)))).select(outputColumns: _*) 

output.show() 

/* 
+-----+-----+----+----------+----------+--------------+ 
| amt| id| num|start_date|identifier|another_column| 
+-----+-----+----+----------+----------+--------------+ 
|43.45|19840|A345|2014-12-26| 232323|   123| 
|43.45|19840|A345|2013-12-12|  45466|   234| 
|43.45|19840|A345|2010-03-16|  34343|   345| 
|43.45|19840|A345|2013-16-12|  45454|   456| 
+-----+-----+----+----------+----------+--------------+ 
*/ 
+0

是的,这正是我所看到的。但我认为这只有当我有两列(start_date&标识符)时才有效。是否有可能使其成为动态的,以便它可以用于任意数量的列? –

+0

这是可能的,但我想到的解决方案需要所有这些压缩列具有相同的类型(例如'string',所以'identifier'列中的所有值都必须是字符串)。你会接受吗? –

+0

是的。如果你分享示例代码,那将是非常好的。谢谢。 –

0

如果我理解正确,您需要第3列和第4列的第一个元素。 这是否有意义?

val newDataFrame = for { 
    row <- oldDataFrame 
} yield { 
    val zro = row(0) // 43.45 
    val one = row(1) // 19840 
    val two = row(2) // A345 
    val dates = row(3) // [2014-12-26, 2013-12-12] 
    val numbers = row(4) // [232323,45466] 
    Row(zro, one, two, dates(0), numbers(0)) 
} 
+0

没有,我想要第一个和第二个元素。 col3和col4的第一个元素在一行中,col3的第二个元素和第二行中的col4。 样本输入:43.45 | 19840 | A345 | [2014-12-26,2013-12-12] | [232323,45466] |输出:43.45 | 19840 | A345 | 2014-12-26 | 232323 43.45 | 19840 | A345 | 2013-12-12 | 45466 –

0

你可以使用SparkSQL。

  • 首先创建与信息的视图,我们需要处理:

    df.createOrReplaceTempView("tableTest")

  • 然后你就可以用扩展选择数据:

    sparkSession.sqlContext.sql(
        "SELECT Amt, id, num, expanded_start_date, expanded_id " + 
        "FROM tableTest " + 
        "LATERAL VIEW explode(Start_date) Start_date AS expanded_start_date " + 
        "LATERAL VIEW explode(Identifier) AS expanded_id") 
    .show()