2014-02-18 85 views
3

我是Spark和Scala的新手。我们将广告活动日志文件格式化为CSV,然后使用pkzip进行压缩。我见过很多关于如何使用Java解压压缩文件的例子,但是我如何使用Scala为Spark做这件事?我们最终想要从每个传入文件中获取,提取和加载数据到Hbase目标表中。也许这可以通过HadoopRDD完成?在此之后,我们将介绍Spark流媒体观看这些文件。Spark/Scala打开压缩的CSV文件

感谢, 本

回答

4

火花,只要你的文件有正确的文件名后缀(如。广州的gzip压缩的),并且它是由org.apache.hadoop.io.compress.CompressionCodecFactory支持,那么你可以只用

sc.textFile(path) 

UPDATE :在编写它们时,它是Hadoop bzip2库中的一个错误,这意味着尝试使用spark来读取bzip2文件导致奇怪的异常 - 通常是ArrayIndexOutOfBounds。

+0

它对我不起作用。我有一个压缩文件(使用.zip扩展名)并执行'sc.textFile(path)'引发异常... – mgaido

0

默认压缩支持

@samthebest答案是正确的,如果你使用的压缩格式,在星火可用默认(Hadoop的)。它们是:

  • bzip2的
  • 的gzip
  • LZ4
  • 活泼

我在对方的回答解释了这个话题更深层次的:https://stackoverflow.com/a/45958182/1549135

阅读拉链

但是,如果您正尝试读取需要创建自定义解决方案的zip文件。其中一个在我已经提供的答案中提到。

如果你需要从存档读取多个文件时,你可能会感兴趣的我已经提供了答案:https://stackoverflow.com/a/45958458/1549135

基本上,所有的时间,使用sc.binaryFiles后来就解压PortableDataStream,如样品中:

sc.binaryFiles(path, minPartitions) 
    .flatMap { case (name: String, content: PortableDataStream) => 
    val zis = new ZipInputStream(content.open) 
    Stream.continually(zis.getNextEntry) 
      .takeWhile(_ != null) 
      .flatMap { _ => 
       val br = new BufferedReader(new InputStreamReader(zis)) 
       Stream.continually(br.readLine()).takeWhile(_ != null) 
      }