2016-11-18 121 views
2

我有一个需求来处理流入S3文件夹的xml文件。目前,我已经实施如下。Spark Streaming xml文件

首先,利用星火的FILESTREAM

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

对于每个RDD,检查是否有文件被读

if (data.count() !=0) 

写入字符串到一个新的HDFS目录

阅读文件

从上面的HDFS di创建数据帧读取教区长

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir) 

请在数据帧进行一些处理,并保存为JSON

loaddata.write.mode("append").json("s3://mybucket/somefolder") 

不知怎的,我觉得上面的方法是非常低效的,坦率地挺孩子气的学校。 有更好的解决方案吗?任何帮助将不胜感激。

后续问题: 如何操纵数据框中的字段(不是列)? 我有一个复杂的嵌套xml,当我使用上面描述的方法时,我得到了一个有9列和50个奇数内部Struct数组的数据框。除了需要修剪某些字段名称外,这很好。有没有办法在不爆炸数据框的情况下实现这一点,因为我需要再次构建相同的结构?

回答

1

如果您使用的Spark 2.0你可以使其与结构化工作流:

val inputDF = spark.readStream.format("com.databricks.spark.xml") 
    .option("rowTag", "Trans") 
    .load(path) 
+0

非常感谢。我的目标环境是使用Spark 2.0.1的EMR堆栈。我会在EMR盒上尝试你的建议。 – Vamsi

+0

如果你对上面提到的解决方案没问题,请投票/接受。 –