火花2.2,你可以使用函数from_json与没有的JSON解析为您服务。
from_json(E:柱,模式:字符串,选择:地图[字符串,字符串]):柱解析包含JSON字符串转换为StructType
或StructTypes
ArrayType
与指定模式的一列。
支持通过使用*
(星号)来展平嵌套列,这似乎是最佳解决方案。
// the input dataset (just a single JSON blob)
val jsonstrings = Seq("""{
"key1": "value1",
"key2": {
"level2key1": "level2value1",
"level2key2": "level2value2"
}
}""").toDF("jsonstring")
// define the schema of JSON messages
import org.apache.spark.sql.types._
val key2schema = new StructType()
.add($"level2key1".string)
.add($"level2key2".string)
val schema = new StructType()
.add($"key1".string)
.add("key2", key2schema)
scala> schema.printTreeString
root
|-- key1: string (nullable = true)
|-- key2: struct (nullable = true)
| |-- level2key1: string (nullable = true)
| |-- level2key2: string (nullable = true)
val messages = jsonstrings
.select(from_json($"jsonstring", schema) as "json")
.select("json.*") // <-- flattening nested fields
scala> messages.show(truncate = false)
+------+---------------------------+
|key1 |key2 |
+------+---------------------------+
|value1|[level2value1,level2value2]|
+------+---------------------------+
scala> messages.select("key1", "key2.*").show(truncate = false)
+------+------------+------------+
|key1 |level2key1 |level2key2 |
+------+------------+------------+
|value1|level2value1|level2value2|
+------+------------+------------+
感谢这有帮助...有没有人有一些片段来处理例外...就像例如如果key2不存在,它会失败..我想处理它,只是说没找到或类似的东西...我相信它的微不足道的,我可以搞清楚,但如果有人拥有它,将是很好的片段;) –