我有一个需求来处理流入S3文件夹的xml文件。目前,我已经实施如下。Spark Streaming xml文件
首先,利用星火的FILESTREAM
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
对于每个RDD,检查是否有文件被读
if (data.count() !=0)
写入字符串到一个新的HDFS目录
阅读文件从上面的HDFS di创建数据帧读取教区长
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
请在数据帧进行一些处理,并保存为JSON
loaddata.write.mode("append").json("s3://mybucket/somefolder")
不知怎的,我觉得上面的方法是非常低效的,坦率地挺孩子气的学校。 有更好的解决方案吗?任何帮助将不胜感激。
后续问题: 如何操纵数据框中的字段(不是列)? 我有一个复杂的嵌套xml,当我使用上面描述的方法时,我得到了一个有9列和50个奇数内部Struct数组的数据框。除了需要修剪某些字段名称外,这很好。有没有办法在不爆炸数据框的情况下实现这一点,因为我需要再次构建相同的结构?
非常感谢。我的目标环境是使用Spark 2.0.1的EMR堆栈。我会在EMR盒上尝试你的建议。 – Vamsi
如果你对上面提到的解决方案没问题,请投票/接受。 –