2017-05-28 52 views
1

我嵌套了JSON,并希望以表格结构输出。我能够单独解析JSON值,但在制表中存在一些问题。我可以通过数据框轻松完成。但我想用“RDD ONLY”功能来做。任何帮助非常感谢。使用Spark-Scala将表格结构压扁JSON RDD only fucntion

输入JSON:

{ "level":{"productReference":{ 

    "prodID":"1234", 

    "unitOfMeasure":"EA" 

    }, 

    "states":[ 
    { 
     "state":"SELL", 
     "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
     "stockQuantity":{ 
      "quantity":1400.0, 
      "stockKeepingLevel":"A" 
     } 
    }, 
    { 
     "state":"HELD", 
     "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
     "stockQuantity":{ 
      "quantity":800.0, 
      "stockKeepingLevel":"B" 
     } 
    } 
    ] }} 

预期输出:

enter image description here

我试过下面星火代码。但获取像这样的输出和Row()对象不能解析这个。

079562193 EA,List(SELLABLE,HELD),List(2015-10-09T00:55:23.6345Z,2015-10-09T00:55:23.6345Z),List(1400.0,800.0),List(SINGLE ,单)

def main(Args : Array[String]): Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val salesSchema = StructType(Array(
    StructField("prodID", StringType, true), 
    StructField("unitOfMeasure", StringType, true), 
    StructField("state", StringType, true), 
    StructField("effectiveDateTime", StringType, true), 
    StructField("quantity", StringType, true), 
    StructField("stockKeepingLevel", StringType, true) 
)) 

    val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json") 

    val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => { 

     parse(eachJsonMessages) 

     }).map(insideEachJson=>{ 
     implicit val formats = org.json4s.DefaultFormats 

     val prodID = (insideEachJson\ "level" \"productReference" \"TPNB").extract[String].toString 
     val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString 

     val state= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
      map(x=>(x\"state").extract[String]).toString() 
     val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"effectiveDateTime").extract[String]).toString 
     val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]). 
     toString 
     val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]). 
     toString 

     //Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

    println(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

     }).collect() 

    // sqlContext.createDataFrame(x,salesSchema).show(truncate = false) 

} 
+0

什么问题阻止了它的工作?看到适当的异常,编译器错误,无论如何都很难诊断问题。 – Phasmid

+0

感谢您查看问题。行对象不能表示它。这就是我刚才把打印语句弄清楚的原因。我定义的Schema和我传递的Row()对象不匹配,所以我希望有任何帮助来解决这个问题。 –

回答

0

有2个版本的解决方案,您的问题。

版本1:

def main(Args : Array[String]): Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val salesSchema = StructType(Array(
    StructField("prodID", StringType, true), 
    StructField("unitOfMeasure", StringType, true), 
    StructField("state", StringType, true), 
    StructField("effectiveDateTime", StringType, true), 
    StructField("quantity", StringType, true), 
    StructField("stockKeepingLevel", StringType, true) 
)) 

    val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")  

    val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => { 

    parse(eachJsonMessages) 

    }).map(insideEachJson=>{ 
    implicit val formats = org.json4s.DefaultFormats 

    val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString 
    val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString 

    val state= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"state").extract[String]).toString() 
    val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"effectiveDateTime").extract[String]).toString 
    val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]). 
    toString 
    val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]). 
    toString 

    Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

    }) 

    sqlContext.createDataFrame(x,salesSchema).show(truncate = false) 

} 

这会给你以下的输出:

+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+ 
|prodID|unitOfMeasure|state   |effectiveDateTime           |quantity   |stockKeepingLevel| 
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+ 
|1234 |EA   |List(SELL, HELD)|List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z)|List(1400.0, 800.0)|List(A, B)  | 
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+ 

2版

def main(Args : Array[String]): Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val salesSchema = StructType(Array(
    StructField("prodID", StringType, true), 
    StructField("unitOfMeasure", StringType, true), 
    StructField("state", ArrayType(StringType, true), true), 
    StructField("effectiveDateTime", ArrayType(StringType, true), true), 
    StructField("quantity", ArrayType(DoubleType, true), true), 
    StructField("stockKeepingLevel", ArrayType(StringType, true), true) 
)) 

    val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")  

    val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => { 

    parse(eachJsonMessages) 

    }).map(insideEachJson=>{ 
    implicit val formats = org.json4s.DefaultFormats 

    val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString 
    val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString 

    val state= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"state").extract[String]) 
    val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"effectiveDateTime").extract[String]) 
    val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]) 
    val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]) 

    Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

    }) 


    sqlContext.createDataFrame(x,salesSchema).show(truncate = false) 

} 

这会给你以下的输出:

+------+-------------+------------+------------------------------------------------------+---------------+-----------------+ 
|prodID|unitOfMeasure|state  |effectiveDateTime          |quantity  |stockKeepingLevel| 
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+ 
|1234 |EA   |[SELL, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]|[1400.0, 800.0]|[A, B]   | 
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+ 

版本1 & 2之间的区别是模式。在版本1中,您正将每一列投射到String,而在版本2中,它们将投射到Array

+0

非常感谢这个解决方案。但是我希望将输出记录分成两行,这是我在问题陈述的“预期输出:”部分中指定的方式。有没有什么办法可以让它传递Row()对象。您在版本2中提到的输出,需要再次平铺,这是可行的。但是我更喜欢扁平行,而将它传递给Row()对象。我真的很感谢你对这个问题的兴趣 –

0

DataFrameDataSetrddoptimized这里面有很多的options以尽量达到我们想要的解决方案。

在我看来,DataFrame的开发目的是为了让开发人员能够以表格的形式轻松查看数据,以便轻松实现逻辑。所以我总是建议用户使用dataframedataset

谈话要少得多,我使用dataframe在下面发布您的解决方案。一旦你有一个dataframe,切换到rdd是非常容易的。

您所需的解决方案是低于(你必须找到一种方法来读取json文件作为其下面json string做:那你:)好运气的分配)

import org.apache.spark.sql.functions._ 
val json = """ { "level":{"productReference":{ 

        "prodID":"1234", 

        "unitOfMeasure":"EA" 

       }, 

       "states":[ 
        { 
        "state":"SELL", 
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
        "stockQuantity":{ 
         "quantity":1400.0, 
         "stockKeepingLevel":"A" 
        } 
        }, 
        { 
        "state":"HELD", 
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
        "stockQuantity":{ 
         "quantity":800.0, 
         "stockKeepingLevel":"B" 
        } 
        } 
       ] }}""" 

val rddJson = sparkContext.parallelize(Seq(json)) 
var df = sqlContext.read.json(rddJson) 
df = df.withColumn("prodID", df("level.productReference.prodID")) 
    .withColumn("unitOfMeasure", df("level.productReference.unitOfMeasure")) 
    .withColumn("states", explode(df("level.states"))) 
    .drop("level") 
df = df.withColumn("state", df("states.state")) 
    .withColumn("effectiveDateTime", df("states.effectiveDateTime")) 
    .withColumn("quantity", df("states.stockQuantity.quantity")) 
    .withColumn("stockKeepingLevel", df("states.stockQuantity.stockKeepingLevel")) 
    .drop("states") 
df.show(false) 

这会给放出来作为

+------+-------------+-----+-------------------------+--------+-----------------+ 
|prodID|unitOfMeasure|state|effectiveDateTime  |quantity|stockKeepingLevel| 
+------+-------------+-----+-------------------------+--------+-----------------+ 
|1234 |EA   |SELL |2015-10-09T00:55:23.6345Z|1400.0 |A    | 
|1234 |EA   |HELD |2015-10-09T00:55:23.6345Z|800.0 |B    | 
+------+-------------+-----+-------------------------+--------+-----------------+ 

现在你已经所需的输出为dataframe转换为rdd只是调用.rdd

df.rdd.foreach(println) 

会给输出如下

[1234,EA,SELL,2015-10-09T00:55:23.6345Z,1400.0,A] 
[1234,EA,HELD,2015-10-09T00:55:23.6345Z,800.0,B] 

我希望这是有益低于

+0

嗨ramesh,非常感谢。我已经使用数据框完成了这个解决方案。我来自sql背景和daraframe对我来说非常简单。我想到学习Scala,所以想尝试使用RDD只有功能:) –

+0

很高兴听到@RohanNayak。那么你一定找到了更好的解决方案。 :) 为你感到高兴。谢谢你让我知道。你为什么不张贴你的答案,以便我也从中学习。 :) –

+0

嗨,我发布了我的解决方案。 :) –

1

HI是“数据框”这是我开发的唯一解决方案。寻找完整的“RDD ONLY”解决方案

 
def main (Args : Array[String]):Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark DataFrame few more options").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val sourceJsonDF = sqlContext.read.json("product.json") 

     val jsonFlatDF_level = sourceJsonDF.withColumn("explode_states",explode($"level.states")) 
     .withColumn("explode_link",explode($"level._link")) 
     .select($"level.productReference.TPNB".as("TPNB"), 
     $"level.productReference.unitOfMeasure".as("level_unitOfMeasure"), 
     $"level.locationReference.location".as("level_location"), 
     $"level.locationReference.type".as("level_type"), 
     $"explode_states.state".as("level_state"), 
     $"explode_states.effectiveDateTime".as("level_effectiveDateTime"), 
     $"explode_states.stockQuantity.quantity".as("level_quantity"), 
     $"explode_states.stockQuantity.stockKeepingLevel".as("level_stockKeepingLevel"), 
     $"explode_link.rel".as("level_rel"), 
     $"explode_link.href".as("level_href"), 
     $"explode_link.method".as("level_method")) 
jsonFlatDF_oldLevel.show() 

    } 
+1

伟大的思维方式:)谢谢。我赞成这个答案。 :) –