你json
数据似乎已损坏,即它不能通过使用spark.read.json("myfile.json")
有解决类似的问题通过使用wholeTextFiles
API读入有效的数据帧
val rdd = sc.wholeTextFiles("myfile.json")
val json = rdd.flatMap(_._2.replace(":\n", ":").replace(",\n", "").replace("}\n", "}").replace(" ", "").replace("}{", "}\n{").split("\n"))
这应该会给你rdd
数据(个有效jsons)作为
{"Location":{"filter":{"name":"houston","Disaster":"hurricane"}}}
{"Location":{"filter":{"name":"florida","Disaster":"hurricane"}}}
{"Location":{"filter":{"name":"seattle"}}}
现在你可以阅读json rdd
到dataframe
val df = sqlContext.read.json(json)
这应该给你
+---------------------+
|Location |
+---------------------+
|[[hurricane,houston]]|
|[[hurricane,florida]]|
|[[null,seattle]] |
+---------------------+
与schema
为
root
|-- Location: struct (nullable = true)
| |-- filter: struct (nullable = true)
| | |-- Disaster: string (nullable = true)
| | |-- name: string (nullable = true)
现在,你有一个有效的数据帧,您可以将filter
你申请
val newTable = df.filter($"Location.filter.Disaster" isnotnull)
newTable
将
+---------------------+
|Location |
+---------------------+
|[[hurricane,houston]]|
|[[hurricane,florida]]|
+---------------------+