我是Spark和Scala的新手。我们将广告活动日志文件格式化为CSV,然后使用pkzip进行压缩。我见过很多关于如何使用Java解压压缩文件的例子,但是我如何使用Scala为Spark做这件事?我们最终想要从每个传入文件中获取,提取和加载数据到Hbase目标表中。也许这可以通过HadoopRDD完成?在此之后,我们将介绍Spark流媒体观看这些文件。Spark/Scala打开压缩的CSV文件
感谢, 本
我是Spark和Scala的新手。我们将广告活动日志文件格式化为CSV,然后使用pkzip进行压缩。我见过很多关于如何使用Java解压压缩文件的例子,但是我如何使用Scala为Spark做这件事?我们最终想要从每个传入文件中获取,提取和加载数据到Hbase目标表中。也许这可以通过HadoopRDD完成?在此之后,我们将介绍Spark流媒体观看这些文件。Spark/Scala打开压缩的CSV文件
感谢, 本
火花,只要你的文件有正确的文件名后缀(如。广州的gzip压缩的),并且它是由org.apache.hadoop.io.compress.CompressionCodecFactory
支持,那么你可以只用
sc.textFile(path)
UPDATE :在编写它们时,它是Hadoop bzip2库中的一个错误,这意味着尝试使用spark来读取bzip2文件导致奇怪的异常 - 通常是ArrayIndexOutOfBounds。
@samthebest答案是正确的,如果你使用的压缩格式,在星火可用默认(Hadoop的)。它们是:
我在对方的回答解释了这个话题更深层次的:https://stackoverflow.com/a/45958182/1549135
但是,如果您正尝试读取需要创建自定义解决方案的zip
文件。其中一个在我已经提供的答案中提到。
如果你需要从存档读取多个文件时,你可能会感兴趣的我已经提供了答案:https://stackoverflow.com/a/45958458/1549135
基本上,所有的时间,使用sc.binaryFiles
后来就解压PortableDataStream
,如样品中:
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
它对我不起作用。我有一个压缩文件(使用.zip扩展名)并执行'sc.textFile(path)'引发异常... – mgaido