我有几个XML文件&我已经把这些放在卡夫卡主题&我已经创建了卡夫卡主题的Dstream对象。由于我想解析主题中的xml数据,因此我无法进一步处理。如果任何在Spark流中处理过xml处理的人都可以给我提供他们的帮助。我在过去的两天里一直坚持这一点。Kafka Spark流XML解析/处理
我采取的方法是XML文件 - >卡夫卡主题 - >在Spark流中处理 - >再次将它放回卡夫卡。
我能够将数据放回卡夫卡话题,但无法处理或做火花流中主题的数据。
我有几个XML文件&我已经把这些放在卡夫卡主题&我已经创建了卡夫卡主题的Dstream对象。由于我想解析主题中的xml数据,因此我无法进一步处理。如果任何在Spark流中处理过xml处理的人都可以给我提供他们的帮助。我在过去的两天里一直坚持这一点。Kafka Spark流XML解析/处理
我采取的方法是XML文件 - >卡夫卡主题 - >在Spark流中处理 - >再次将它放回卡夫卡。
我能够将数据放回卡夫卡话题,但无法处理或做火花流中主题的数据。
您期待什么样的处理?如果您期待任何一种数据提取,您可以做的是,foreach消息,将它们转换为json(xml to json非常简单),并将jsonRDD和JsonRDD转换为DF直接转换。因此,您将能够在数据框上进行任何选择或进行其他操作。
我需要你几个输入端,以提供准确的解决方案
1)你想出来的数据是什么? 2)Dataframe中的数据是否足够。?
如果你能够解释输入,这将是非常有益的。
我已经添加了一个示例代码来获取数据框的XML数据。
val jsonStream = kafkaStream.transform(
y => {
y.filter(x => x._1 != null && x._2 != null).map(x => {
XML.toJSONObject(x).toString(4);
}
)
})
jsonStream.foreachRDD(x => {
val sqlContext = SQLContextSingleton.getInstance(x.sparkContext)
if (x != null) {
val df = sqlContext.read.json(x)
// Your DF Operations
}
}
}
)
object SQLContextSingleton {
@transient private var instance: HiveContext = _
def getInstance(sparkContext: SparkContext): HiveContext = {
if (instance == null) {
sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false");
sparkContext.hadoopConfiguration.set("spark.sql.parquet.mergeSchema", "true");
sparkContext.hadoopConfiguration.set("spark.sql.parquet.cacheMetadata","false");
instance = new HiveContext(sparkContext)
}
instance
}
}
嗨Srini,感谢您的快速回复。这个问题已经解决,这是一个非常复杂的用例,我们想要使用火花流连接3种类型的xml。终于完成了。我们使用JAXB来验证xml的各自的模式。正如我所说的,这个用例非常复杂,有很多编码,因此我没有分享任何适合我的代码。再一次感谢你。 – Harsha
你可以添加你的代码,并具体问题是什么以及你得到什么错误或异常? – maasg
@Harsha在阅读来自kafka的消息时遇到了同样的问题,即将消息作为每个标记作为消息来获取。你能否让我知道你是如何解决这个问题的。 –
@ankush reddy使用JAXB来验证XML的各自模式 – Harsha