我试图从Kinesis处理Json字符串。 Json字符串可以有几种不同的形式。从室壁运动,我创建了一个DSTREAM:Spark Streaming Scala结合不同结构的json形成一个DataFrame
val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
val lines = kinesisStream.map(x => new String(x))
lines.foreachRDD((rdd, time) =>{
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits.StringToColumn
if(rdd.count() > 0){
// Process jsons here
// Json strings here would have either one of the formats below
}
})
的RDD串会有这些JSON字符串中的任何一个。 收藏:
[
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30024,
"TargetId": "4138",
"Timestamp": 0
},
"host": "host1"
},
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30025,
"TargetId": "4139",
"Timestamp": 0
},
"host": "host1"
}
]
和一些Json的字符串,如单对象,以便:
{
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30026,
"TargetId": "4140",
"Timestamp": 0
}
我希望能够从“数据”提取对象的关键,如果它是第一种类型的JSON字符串并结合第二种类型的Json并形成一个RDD/DataFrame,我该如何实现这一点?
最后,我想我的数据帧是这样的:
+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30026| 4140| 0|
+------------------+---------+--------+---------+
对不起,新斯卡拉和火花。我一直在寻找现有的例子,但不幸的是没有找到解决方案。
非常感谢提前。
感谢您的快速响应!对不起,我忘了提及我正在使用Spark Streaming DStreams,我已经更新了我的问题。你的回应仍然有帮助! – j3tr1
如果你能够从你的DStream中提取字符串,代码应该或多或少的工作。 – philantrovert
谢谢!这通过使用json4s指出了我的正确方向。这允许我在转换为DF之前先处理json字符串 – j3tr1