2017-06-15 201 views
0

我使用Spark Streaming与Scala,并从卡夫卡获取json记录。我想解析它,以便我可以获取值(日期时间和质量)和过程。Scala解析来自kafka的json记录

这里是我的代码:

stream.foreachRDD(rdd => { 
    rdd.collect().foreach(i => 
    println(msgParse(i.value()).quality) 
) 
}) 

而且我有这样的情况下,阶级和我解析功能:

case class diskQuality(datetime: String , quality : Double) extends Serializable 

def msgParse(value: String): diskQuality = { 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 

    implicit val formats = DefaultFormats 

    val res = parse(value).extract[diskQuality] 
    return res 

} 

我已经添加了这种相关性:

libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4" 

的记录我收到此格式:

"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}" 

但是我得到这个错误:

Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}" 

编辑:

当我尝试解析使用它的工作原理相同的功能如下。但是,即使卡夫卡消息都以相同的格式,但它仍然给出了同样的错误:

val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}" 

我使用scalaVersion:=“2.10.6”和json4s-native_2.10"

任何帮助。将非常感激。谢谢你们的时间

+0

第一种格式是正确的 - “{\”datetime \“:\”14-05-2017 14:18:30 \“,\”quality \“:92.6}”。而你的代码也适用于它。你能否检查一下build.sbt中的Scala版本是什么。 org.json4s依赖是2.10吗?此外,您可以记录msgParse函数的值参数,以检查它的实际值。 –

+0

感谢您的回复,我编辑了我的问题,这是我打印msgParse时的值:“{\”datetime \“:\”24-04-2017 07:53:30 \“,\”quality \“:100.0}” – AsmaaM

+0

@AsmaaM如果这是您的控制台输出 - 您在引号转义时遇到问题,您能否检查您的制作人发送给kafka的内容? – ledniov

回答

1

看起来你对你的卡夫卡制片方有问题,你必须通过更换转义引号与以下格式来结束:

{"datetime":"14-05-2017 14:18:30","quality":92.6}

它会给你格式正确的JSON字符串。

+0

一切正常吧!再次感谢你 – AsmaaM