2016-07-05 56 views
0

我有很多json文件,但是它们没有正确格式化为Spark。我不想编写代码,通过对每行中的每个字典进行规范化来专门将它们转换为正确的格式。合并来自不良JSON的Spark RDD

相反,我希望使用火花来解析他们的内容。我有以下

import json 

import os 

json_dir = '/data/original/TEMP' 
df = sc.wholeTextFiles(os.path.join(json_dir,'*.json')) 
j_docs = df.map(lambda x: json.loads(x[1])).cache() 

这工作正常,j_docs本质上是一个列表的列表。例如,j_docs中的第一项是来自第一个文件的字典列表。

我想将所有这些单独的列表合并为一个大的RDD。理想情况下,无需运行数据收集。

感谢

+1

使用flatMap而不是地图? – C4stor

+0

是男人!谢谢。 – browskie

回答

1

,而不是使用映射到flatMap也正是:)

+0

谢谢,我可以不收集吗?如果我尝试,它会给我一个这样的结果:'PipelinedRDD'对象不可迭代 – browskie

+0

它给出了什么?后来我想呢? – C4stor

+0

这个人在flatMap merged = sc.parallelize(j_docs.flatMap(lambda x:x))之后没有collect()返回它 – browskie