2016-06-28 95 views
-1

我有几个XML文件&我已经把这些放在卡夫卡主题&我已经创建了卡夫卡主题的Dstream对象。由于我想解析主题中的xml数据,因此我无法进一步处理。如果任何在Spark流中处理过xml处理的人都可以给我提供他们的帮助。我在过去的两天里一直坚持这一点。Kafka Spark流XML解析/处理

我采取的方法是XML文件 - >卡夫卡主题 - >在Spark流中处理 - >再次将它放回卡夫卡。

我能够将数据放回卡夫卡话题,但无法处理或做火花流中主题的数据。

+0

你可以添加你的代码,并具体问题是什么以及你得到什么错误或异常? – maasg

+0

@Harsha在阅读来自kafka的消息时遇到了同样的问题,即将消息作为每个标记作为消息来获取。你能否让我知道你是如何解决这个问题的。 –

+0

@ankush reddy使用JAXB来验证XML的各自模式 – Harsha

回答

0

您期待什么样的处理?如果您期待任何一种数据提取,您可以做的是,foreach消息,将它们转换为json(xml to json非常简单),并将jsonRDD和JsonRDD转换为DF直接转换。因此,您将能够在数据框上进行任何选择或进行其他操作。

我需要你几个输入端,以提供准确的解决方案

1)你想出来的数据是什么? 2)Dataframe中的数据是否足够。?

如果你能够解释输入,这将是非常有益的。

我已经添加了一个示例代码来获取数据框的XML数据。

val jsonStream = kafkaStream.transform(
     y => { 
     y.filter(x => x._1 != null && x._2 != null).map(x => { 
      XML.toJSONObject(x).toString(4); 
     } 
     ) 
     }) 


jsonStream.foreachRDD(x => { 
     val sqlContext = SQLContextSingleton.getInstance(x.sparkContext) 
     if (x != null) { 

     val df = sqlContext.read.json(x) 
     // Your DF Operations 
     } 
     } 
    } 
) 

object SQLContextSingleton { 

    @transient private var instance: HiveContext = _ 

    def getInstance(sparkContext: SparkContext): HiveContext = { 
    if (instance == null) { 
     sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.mergeSchema", "true"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.cacheMetadata","false"); 
     instance = new HiveContext(sparkContext) 
    } 
    instance 
    } 
} 
+0

嗨Srini,感谢您的快速回复。这个问题已经解决,这是一个非常复杂的用例,我们想要使用火花流连接3种类型的xml。终于完成了。我们使用JAXB来验证xml的各自的模式。正如我所说的,这个用例非常复杂,有很多编码,因此我没有分享任何适合我的代码。再一次感谢你。 – Harsha