2017-02-17 1792 views
1

我正在写一个Spark/Scala程序来读取ZIP文件,将它们解压缩并将内容写入一组新文件。我可以将其写入本地文件系统,但是想知道是否有办法将输出文件写入分布式文件系统(如HDFS)。代码显示below`在Spark/Scala中写入HDFS

import java.util.zip.ZipInputStream 
import org.apache.spark.input.PortableDataStream 
import java.io._ 

var i =1 
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String,  PortableDataStream)) => 
    { 


    val zipStream = new ZipInputStream(file._2.open)    
    val entry = zipStream.getNextEntry        
    val iter = scala.io.Source.fromInputStream(zipStream).getLines   

    val fname = f"/d/tmp/myfile$i.txt" 


    i = i + 1 

    val xx = iter.mkString 
    val writer = new PrintWriter(new File(fname)) 
    writer.write(xx) 
    writer.close() 

    iter              
    }).collect() 

`

回答

3

您可以使用Hadoop的公用库(如果你正在使用SBT作为依赖manangement工具,加thath库到你的依赖)容易写数据到HDFS。这样,您可以创建一个文件系统对象:

private val fs = { 
    val conf = new Configuration() 
    FileSystem.get(conf) 
    } 

一定要与你的Hadoop集群信息(核心site.xml的,等等)

然后,你可以写配置文件系统,例如字符串路径(在你的情况下,你应该处理数据流),在HDFS如下:

@throws[IOException] 
    def writeAsString(hdfsPath: String, content: String) { 
    val path: Path = new Path(hdfsPath) 
    if (fs.exists(path)) { 
     fs.delete(path, true) 
    } 
    val dataOutputStream: FSDataOutputStream = fs.create(path) 
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8")) 
    bw.write(content) 
    bw.close 
    } 
0

你应该从官方文档看看方法saveAsTextFile:http://spark.apache.org/docs/latest/programming-guide.html

它可以让您保存到HDFS:

iter.saveAsTextFile("hdfs://...") 
+0

在该代码中iter不是RDD,所以不能写它。可能首先进行转换。 – dumitru

+0

是的,我认为我们会在这里很好。 RDD应该是数据类型来操纵火花以便在集群上获得分布式数据。 – chateaur

+0

这就是问题的症结所在。我已经尝试了所有我能想到的将我的数据传递给RDD以启用saveasTextFile的使用,但是结果很短。如果有人已经解决了这个问题,请让我知道 – user2699504

0

你可以试试saveAsTextFile方法。

将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。 Spark将在每个元素上调用toString将其转换为文件中的一行文本。

它会将每个分区保存为一个不同的文件。除非您重新分区或合并,否则最终将使用的分区数将与输入文件数相同。

+0

请看我上面的评论,为什么使用saveasTextFile是一个问题 – user2699504

+0

不能你可以写整个RDD而不是单独的每个文件。而不是收集使用saveAsText文件? – NetanelRabinowitz

+0

将所有解压缩数据连接成一个文件。这不是我想要的。我希望每个解压缩文件都在它自己的单独文件中 – user2699504

0
sc.binaryFiles("/user/example/zip_dir", 10)        //make an RDD from *.zip files in HDFS 
      .flatMap((file: (String, PortableDataStream)) => {     //flatmap to unzip each file 
       val zipStream = new ZipInputStream(file._2.open)    //open a java.util.zip.ZipInputStream 
       val entry = zipStream.getNextEntry        //get the first entry in the stream 
       val iter = Source.fromInputStream(zipStream).getLines   //place entry lines into an iterator 
       iter.next              //pop off the iterator's first line 
       iter               //return the iterator 
      }) 
      .saveAsTextFile("/user/example/quoteTable_csv/result.csv")