2017-07-14 124 views
3

我试图从Kinesis处理Json字符串。 Json字符串可以有几种不同的形式。从室壁运动,我创建了一个DSTREAM:Spark Streaming Scala结合不同结构的json形成一个DataFrame

val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com", 
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 

val lines = kinesisStream.map(x => new String(x)) 

lines.foreachRDD((rdd, time) =>{ 

    val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
    import sqlContext.implicits.StringToColumn 

    if(rdd.count() > 0){ 
    // Process jsons here 
    // Json strings here would have either one of the formats below 
    } 
}) 

的RDD串会有这些JSON字符串中的任何一个。 收藏:

[ 
    { 
    "data": { 
     "ApplicationVersion": "1.0.3 (65)", 
     "ProjectId": 30024, 
     "TargetId": "4138", 
     "Timestamp": 0 
    }, 
    "host": "host1" 
    }, 
    { 
    "data": { 
     "ApplicationVersion": "1.0.3 (65)", 
     "ProjectId": 30025, 
     "TargetId": "4139", 
     "Timestamp": 0 
    }, 
    "host": "host1" 
    } 
] 

和一些Json的字符串,如单对象,以便:

{ 
     "ApplicationVersion": "1.0.3 (65)", 
     "ProjectId": 30026, 
     "TargetId": "4140", 
     "Timestamp": 0 
} 

我希望能够从“数据”提取对象的关键,如果它是第一种类型的JSON字符串并结合第二种类型的Json并形成一个RDD/DataFrame,我该如何实现这一点?

最后,我想我的数据帧是这样的:

+------------------+---------+--------+---------+ 
|ApplicationVersion|ProjectId|TargetId|Timestamp| 
+------------------+---------+--------+---------+ 
|  1.0.3 (65)| 30024| 4138|  0| 
|  1.0.3 (65)| 30025| 4139|  0| 
|  1.0.3 (65)| 30026| 4140|  0| 
+------------------+---------+--------+---------+ 

对不起,新斯卡拉和火花。我一直在寻找现有的例子,但不幸的是没有找到解决方案。

非常感谢提前。

回答

0

本例使用json4s

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

implicit val format = DefaultFormats 

case class jsonschema (ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int) 

val string1 = """ 
[ { 
    "data" : { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30024, 
    "TargetId" : "4138", 
    "Timestamp" : 0 
    }, 
    "host" : "host1" 
}, { 
    "data" : { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30025, 
    "TargetId" : "4139", 
    "Timestamp" : 0 
    }, 
    "host" : "host1" 
} ] 

""" 

val string2 = """ 
[ { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30025, 
    "TargetId" : "4140", 
    "Timestamp" : 0 
}, { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30025, 
    "TargetId" : "4141", 
    "Timestamp" : 0 
} ] 
""" 

val json1 = (parse(string1) \ "data").extract[List[jsonschema]] 

val json2 = parse(string2).extract[List[jsonschema]] 

val jsonRDD = json1.union(json2) 

val df = sqlContext.createDataFrame(jsonRDD) 

df.show 


+------------------+---------+--------+---------+ 
|ApplicationVersion|ProjectId|TargetId|Timestamp| 
+------------------+---------+--------+---------+ 
|  1.0.3 (65)| 30024| 4138|  0| 
|  1.0.3 (65)| 30025| 4139|  0| 
|  1.0.3 (65)| 30025| 4140|  0| 
|  1.0.3 (65)| 30025| 4141|  0| 
+------------------+---------+--------+---------+ 
+0

感谢您的快速响应!对不起,我忘了提及我正在使用Spark Streaming DStreams,我已经更新了我的问题。你的回应仍然有帮助! – j3tr1

+0

如果你能够从你的DStream中提取字符串,代码应该或多或少的工作。 – philantrovert

+0

谢谢!这通过使用json4s指出了我的正确方向。这允许我在转换为DF之前先处理json字符串 – j3tr1

0

你可以从第一Dataframe选择data.*列后用工会:

val spark = SparkSession.builder().master("local[*]").getOrCreate()  
val sc = spark.sparkContext 

// Assuming you store your jsons in two separate strings `json1` and `json2` 
val df1 = spark.read.json(sc.parallelize(Seq(json1))) 
val df2 = spark.read.json(sc.parallelize(Seq(json2))) 

import spark.implicits._ 
df1.select($"data.*") // Select only the data columns from first Dataframe 
    .union(df2)   // Union the two Dataframes as they have the same structure 
    .show() 

EDIT [其他解决方案链接]

后您编辑您的问题,我理解你需要某种形式的解析JSON文件时的回退机制。有更多的方法可以使用任何JSON解析库来做到这一点 - 有一个很好的解决方案here与Play,我认为它已经解释了如何以优雅的方式解决这个问题。

一旦你有一个RDD[Data]其中数据是你的“变种”类型,你可以简单地将它转换成Dataframe使用rdd.toDF()

希望有所帮助。

+0

感谢的快速反应安德烈,我很欣赏它,是非常有用的!对不起,我忘了提及我正在使用Spark Streaming DStreams,我已经更新了上面的问题。 – j3tr1

+0

我明白了。这是一个简单的方法来知道哪个对象来的时候? –

+0

不幸的是没有 – j3tr1