我使用Spark Streaming与Scala,并从卡夫卡获取json记录。我想解析它,以便我可以获取值(日期时间和质量)和过程。Scala解析来自kafka的json记录
这里是我的代码:
stream.foreachRDD(rdd => {
rdd.collect().foreach(i =>
println(msgParse(i.value()).quality)
)
})
而且我有这样的情况下,阶级和我解析功能:
case class diskQuality(datetime: String , quality : Double) extends Serializable
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
我已经添加了这种相关性:
libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"
的记录我收到此格式:
"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
但是我得到这个错误:
Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"
编辑:
当我尝试解析使用它的工作原理相同的功能如下。但是,即使卡夫卡消息都以相同的格式,但它仍然给出了同样的错误:
val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
我使用scalaVersion:=“2.10.6”和json4s-native_2.10"
任何帮助。将非常感激。谢谢你们的时间
第一种格式是正确的 - “{\”datetime \“:\”14-05-2017 14:18:30 \“,\”quality \“:92.6}”。而你的代码也适用于它。你能否检查一下build.sbt中的Scala版本是什么。 org.json4s依赖是2.10吗?此外,您可以记录msgParse函数的值参数,以检查它的实际值。 –
感谢您的回复,我编辑了我的问题,这是我打印msgParse时的值:“{\”datetime \“:\”24-04-2017 07:53:30 \“,\”quality \“:100.0}” – AsmaaM
@AsmaaM如果这是您的控制台输出 - 您在引号转义时遇到问题,您能否检查您的制作人发送给kafka的内容? – ledniov