2017-04-22 56 views
2

我有一个返回数据的SQL查询的数据帧如何转换平坦的数据帧到火花嵌套JSON(斯卡拉或Java)

id,type,name,ppu,batter.id,batter.type,topping.id,topping.type 
101,donut,cake,0_55,1001,Regular,5001,None 
101,donut,cake,0_55,1002,Chocolate,5001,None 
101,donut,cake,0_55,1003,Blueberry,5001,None 
101,donut,cake,0_55,1004,Devil's Food,5001,None 
101,donut,cake,0_55,1001,Regular,5002,Glazed 
101,donut,cake,0_55,1002,Chocolate,5002,Glazed 
101,donut,cake,0_55,1003,Blueberry,5002,Glazed 
101,donut,cake,0_55,1004,Devil's Food,5002,Glazed 
101,donut,cake,0_55,1001,Regular,5003,Chocolate 
101,donut,cake,0_55,1002,Chocolate,5003,Chocolate 
101,donut,cake,0_55,1003,Blueberry,5003,Chocolate 
101,donut,cake,0_55,1004,Devil's Food,5003,Chocolate 

这样设置可我需要覆盖到嵌套这json结构像这样。

{ 
    "id": "101", 
    "type": "donut", 
    "name": "Cake", 
    "ppu": 0.55, 
    "batter": 
     [ 
      { "id": "1001", "type": "Regular" }, 
      { "id": "1002", "type": "Chocolate" }, 
      { "id": "1003", "type": "Blueberry" }, 
      { "id": "1004", "type": "Devil's Food" } 
     ], 
    "topping": 
     [ 
      { "id": "5001", "type": "None" }, 
      { "id": "5002", "type": "Glazed" }, 
      { "id": "5003", "type": "Chocolate" } 
     ] 
} 

我们是否有可能在Dataframe聚合或自定义转换中执行此操作,我必须编写它。

在这里找到了类似的问题 Writing nested JSON in spark scala 但是没有相当正确的答案。

回答

0

所以,显然没有直接的方式通过数据帧API来完成这项任务。你可以使用

df.toJson.{..} 

但它不会给你你想要的输出。

你必须写一个混乱的变换,我很想听听任何其他可能的解决方案。我假设你的结果适合内存,因为它必须返回给驱动程序。另外,我在这里使用Gson API来创建json。

def arrToJson(arr: Array[Row]): JsonObject = { 
    val jo = new JsonObject 
    arr.map(row => ((row(0) + "," + row(1) + "," + row(2) + "," + row(3)), 
     (row(4) + "," + row(5) + "," + row(6) + "," + row(7)))) 
     .groupBy(_._1).map(f => (f._1.split(","), f._2.map(_._2.split(",")))) 
     .foreach { x => 
     { 

      jo.addProperty("id", x._1(0)) 
      jo.addProperty("type", x._1(1)) 
      jo.addProperty("name", x._1(2)) 
      jo.addProperty("ppu", x._1(3)) 

      val bja = new JsonArray 
      val tja = new JsonArray 
      x._2.foreach(f => { 
      val bjo = new JsonObject 
      val tjo = new JsonObject 

      bjo.addProperty("id", f(0)) 
      bjo.addProperty("type", f(1)) 

      tjo.addProperty("id", f(2)) 
      tjo.addProperty("type", f(3)) 

      bja.add(bjo) 
      tja.add(tjo) 
      }) 
      jo.add("batter", bja) 
      jo.add("topping", tja) 

     } 
     } 

    jo 
    } 
+0

谢谢Chitral,但不幸的是,在我的情况下,对于100列和至少2层嵌套,这可能会变得很复杂。另外在我的情况下,数据可能不适合驱动程序,所以我期待在分布式节点中进行转换。我在某种程度上可以使用UDAF,但只能用于只有一个嵌套对象的简单对象。我听说Spark 2.0对此有更好的支持。但不确定。再次感谢您的努力:) – BRKumaran