这是你要找的东西吗?
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sparkSession = ...
import sparkSession.implicits._
val input = sc.parallelize(Seq(
(43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq(232323,45466)),
(43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq(34343,45454))
)).toDF("amt", "id", "num", "start_date", "identifier")
val zipArrays = udf { (dates: Seq[String], identifiers: Seq[Int]) =>
dates.zip(identifiers)
}
val output = input.select($"amt", $"id", $"num", explode(zipArrays($"start_date", $"identifier")))
.select($"amt", $"id", $"num", $"col._1".as("start_date"), $"col._2".as("identifier"))
output.show()
将返回:
+-----+-----+----+----------+----------+
| amt| id| num|start_date|identifier|
+-----+-----+----+----------+----------+
|43.45|19840|A345|2014-12-26| 232323|
|43.45|19840|A345|2013-12-12| 45466|
|43.45|19840|A345|2010-03-16| 34343|
|43.45|19840|A345|2013-16-12| 45454|
+-----+-----+----+----------+----------+
编辑:
既然你想有应拉上多列,你应该尝试这样的事:
val input = sc.parallelize(Seq(
(43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq("232323","45466"), Seq("123", "234")),
(43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq("34343","45454"), Seq("345", "456"))
)).toDF("amt", "id", "num", "start_date", "identifier", "another_column")
val zipArrays = udf { seqs: Seq[Seq[String]] =>
for(i <- seqs.head.indices) yield seqs.fold(Seq.empty)((accu, seq) => accu :+ seq(i))
}
val columnsToSelect = Seq($"amt", $"id", $"num")
val columnsToZip = Seq($"start_date", $"identifier", $"another_column")
val outputColumns = columnsToSelect ++ columnsToZip.zipWithIndex.map { case (column, index) =>
$"col".getItem(index).as(column.toString())
}
val output = input.select($"amt", $"id", $"num", explode(zipArrays(array(columnsToZip: _*)))).select(outputColumns: _*)
output.show()
/*
+-----+-----+----+----------+----------+--------------+
| amt| id| num|start_date|identifier|another_column|
+-----+-----+----+----------+----------+--------------+
|43.45|19840|A345|2014-12-26| 232323| 123|
|43.45|19840|A345|2013-12-12| 45466| 234|
|43.45|19840|A345|2010-03-16| 34343| 345|
|43.45|19840|A345|2013-16-12| 45454| 456|
+-----+-----+----+----------+----------+--------------+
*/
是的,这正是我所看到的。但我认为这只有当我有两列(start_date&标识符)时才有效。是否有可能使其成为动态的,以便它可以用于任意数量的列? –
这是可能的,但我想到的解决方案需要所有这些压缩列具有相同的类型(例如'string',所以'identifier'列中的所有值都必须是字符串)。你会接受吗? –
是的。如果你分享示例代码,那将是非常好的。谢谢。 –