2017-09-05 85 views
2

我有一些包含JSON对象的文本文件(每行一个对象)。示例:根据某个键值(pyspark)从RDD创建多个Spark DataFrame

{"a": 1, "b": 2, "table": "foo"} 
{"c": 3, "d": 4, "table": "bar"} 
{"a": 5, "b": 6, "table": "foo"} 
... 

我想根据表名称将文本文件的内容解析到Spark DataFrame中。所以在上面的例子中,我将有一个“foo”的DataFrame和“bar”的另一个DataFrame。我已尽可能JSON的线分组为一个RDD的内部列出与以下(pyspark)代码:

text_rdd = sc.textFile(os.path.join("/path/to/data", "*")) 
tables_rdd = text_rdd.groupBy(lambda x: json.loads(x)['table']) 

这产生含有元组的列表与以下结构的RDD:

RDD[("foo", ['{"a": 1, "b": 2, "table": "foo"}', ...], 
    ("bar", ['{"c": 3, "d": 4, "table": "bar"}', ...]] 

如何将此RDD分解为每个表键的DataFrame?

编辑:我试图澄清上面有一个单一的文件中包含多个表中的信息行。我知道我可以在我创建的“groupBy”RDD上调用.collectAsMap,但我知道这会在我的驱动程序上占用相当数量的RAM。我的问题是:有没有办法在不使用.collectAsMap的情况下将“groupBy”RDD分成多个DataFrame?

回答

3

可以有效地将其切分为拼花分区: 首先,我们将它转​​换成数据帧:

text_rdd = sc.textFile(os.path.join("/path/to/data", "*")) 
df = spark.read.json(text_rdd) 
df.printSchema() 
    root 
    |-- a: long (nullable = true) 
    |-- b: long (nullable = true) 
    |-- c: long (nullable = true) 
    |-- d: long (nullable = true) 
    |-- table: string (nullable = true) 

现在我们可以把它写:

df.write.partitionBy('table').parquet([output directory name]) 

如果您列出的内容[output directory name],您将会看到与table不同的值:

hadoop fs -ls [output directory name] 

    _SUCCESS 
    table=bar/ 
    table=foo/ 

如果你想只保留每个表的列,你可以这样做(假设列的完整列表出现每当表在文件中出现)

import ast 
from pyspark.sql import Row 
table_cols = spark.createDataFrame(text_rdd.map(lambda l: ast.literal_eval(l)).map(lambda l: Row(
     table = l["table"], 
     keys = sorted(l.keys()) 
    ))).distinct().toPandas() 
table_cols = table_cols.set_index("table") 
table_cols.to_dict()["keys"] 

    {u'bar': [u'c', u'd', u'table'], u'foo': [u'a', u'b', u'table']} 
0

步骤如下:

  1. 地图每个文本字符串JSON。

    jsonRdd = sc.textFile(os.path.join("/path/to/data", "*")).map (.....) 
    
  2. 获取所有不同的表名给驱动程序。

    tables = jsonRdd.map(<extract table name only from json object >).distinct().collect() 
    
  3. 迭代通过各(步骤2)的表并过滤主要jsonRdd来创建单个表RDD。

    tablesRDD=[] 
    for table in tables: 
        # categorize each main rdd record based on table name. 
        # Compare each json object table element with for loop table string and on successful match return true. 
        output.append(jasonRdd.filter(lambda jsonObj: jsonObj['table'] == table)) 
    

我不是Python开发如此精确的代码片段可能无法正常工作的。