我正在写一个Spark/Scala程序来读取ZIP文件,将它们解压缩并将内容写入一组新文件。我可以将其写入本地文件系统,但是想知道是否有办法将输出文件写入分布式文件系统(如HDFS)。代码显示below`在Spark/Scala中写入HDFS
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String, PortableDataStream)) =>
{
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
}).collect()
`
在该代码中iter不是RDD,所以不能写它。可能首先进行转换。 – dumitru
是的,我认为我们会在这里很好。 RDD应该是数据类型来操纵火花以便在集群上获得分布式数据。 – chateaur
这就是问题的症结所在。我已经尝试了所有我能想到的将我的数据传递给RDD以启用saveasTextFile的使用,但是结果很短。如果有人已经解决了这个问题,请让我知道 – user2699504