2010-11-23 103 views
35

如何用Scala Stream读取大型CSV文件(> 1 Gb)?你有代码示例吗?或者你会用不同的方式来读取一个大的CSV文件,而不是先把它加载到内存中?如何使用Scala Stream类读取大型CSV文件?

+0

你的意思是流如在懒惰评估功能?这大概可能,但不是必需的? - 逐行读取文件实质上已经是了。我对Scala io的速度还不是很快,但getLines(从源代码快速浏览)也是以懒惰的方式实现的 - 是否将所有文件读入内存? – 2010-11-23 10:46:36

+0

我相信它会读入内存,因为当使用scala.Source.fromFile(),然后getLines()时会出现OutOfMemoryException。所以使用Stream类听起来像是一个有效的选择,对吧? – 2010-11-23 10:49:17

回答

62

只要使用Source.fromFile(...).getLines就像你已经说过的那样。

返回一个Iterator,这已经是懒惰(你会使用流作为在那里你就想以前提取的值进行memoized一个懒惰的集合,这样你就可以再次阅读)

如果您收到内存问题,那么问题将出现在你正在做的之后 getLines。像toList这样的任何操作都会导致严重的收集问题。

10

我希望你不是指斯卡拉的collection.immutable.Stream与流。这是而不是你想要什么。流是懒惰的,但会记忆。

我不知道你打算做什么,但只是逐行阅读文件应该工作得很好,而不使用大量的内存。

getLines应评估懒惰,不应该崩溃(只要你的文件没有超过2³²线,afaik)。如果是这样,请询问#scala或提交错误消息(或两者兼而有之)。

3

如果您希望一行一行地处理大文件,同时避免要求将整个文件的内容一次加载到内存中,则可以使用由scala.io.Source返回的Iterator

我有一个小功能,tryProcessSource,(包含两个子功能),我正是用这些类型的用例。该功能最多需要四个参数,其中只有第一个参数是必需的。其他参数提供了理智的默认值。

这里的功能模式(全功能的实现是在底部):

def tryProcessSource(
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    retainValues: (Int, List[String]) => Option[List[String]] = 
    (index, parsedValues) => Some(parsedValues), 
): Try[List[List[String]]] = { 
    ??? 
} 

第一个参数,file: File,是必需的。它只是java.io.File的任何有效实例,它指向一个面向行的文本文件,如CSV。

第二个参数parseLine: (Int, String) => Option[List[String]]是可选的。如果提供,它必须是一个函数,期望接收两个输入参数; index: IntunparsedLine: String。然后返回Option[List[String]]。该函数可能会返回由有效列值组成的包装List[String]Some。或者它可能会返回一个None,表示整个流式处理过程正在中止。如果未提供此参数,则提供缺省值(index, line) => Some(List(line))。该缺省结果将整个行作为单个String值返回。

第三个参数filterLine: (Int, List[String]) => Option[Boolean]是可选的。如果提供,它必须是一个函数,期望接收两个输入参数; index: IntparsedValues: List[String]。然后返回Option[Boolean]。该函数可能会返回一个Some包装Boolean,指示是否应将此特定行包含在输出中。或者它可能会返回一个None,表示整个流式处理过程正在中止。如果未提供此参数,则提供缺省值(index, values) => Some(true)。此默认结果包含所有行。

第四个也是最后一个参数retainValues: (Int, List[String]) => Option[List[String]]是可选的。如果提供,它必须是一个函数,期望接收两个输入参数; index: IntparsedValues: List[String]。然后返回Option[List[String]]。该函数可能会返回一个包含List[String]的包装List[String],其中包含一些子集和/或现有列值的更改。或者它可能会返回一个None,表示整个流式处理过程正在中止。如果未提供此参数,则提供缺省值(index, values) => Some(values)。此默认值导致由第二个参数parseLine解析的值。

考虑采用以下内容的文件(4号线):

street,street2,city,state,zip 
100 Main Str,,Irving,TX,75039 
231 Park Ave,,Irving,TX,75039 
1400 Beltline Rd,Apt 312,Dallas,Tx,75240 

下调用资料...

val tryLinesDefaults = 
    tryProcessSource(new File("path/to/file.csv")) 

......结果这个输出tryLinesDefaults(文件内容不变):

Success(
    List(
    List("street,street2,city,state,zip"), 
    List("100 Main Str,,Irving,TX,75039"), 
    List("231 Park Ave,,Irving,TX,75039"), 
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240") 
) 
) 

以下主叫轮廓...

val tryLinesParseOnly = 
    tryProcessSource(
     new File("path/to/file.csv") 
    , parseLine = 
     (index, unparsedLine) => Some(unparsedLine.split(",").toList) 
) 

...结果在此输出tryLinesParseOnly(解析成各个列值的每一行):

Success(
    List(
    List("street","street2","city","state","zip"), 
    List("100 Main Str","","Irving,TX","75039"), 
    List("231 Park Ave","","Irving","TX","75039"), 
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240") 
) 
) 

的以下呼叫简介...

val tryLinesIrvingTxNoHeader = 
    tryProcessSource(
     new File("C:/Users/Jim/Desktop/test.csv") 
    , parseLine = 
     (index, unparsedLine) => Some(unparsedLine.split(",").toList) 
    , filterLine = 
     (index, parsedValues) => 
      Some(
      (index != 0) && //skip header line 
      (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving 
      (parsedValues(3).toLowerCase == "Tx".toLowerCase) 
     ) 
) 

......结果这个输出tryLinesIrvingTxNoHeader(每行解析成单独的列值,没有头,只有在欧文的两排,TX):

Success(
    List(
    List("100 Main Str","","Irving,TX","75039"), 
    List("231 Park Ave","","Irving","TX","75039"), 
) 
) 

这里就是整个tryProcessSource功能的实现:

import scala.io.Source 
import scala.util.Try 

import java.io.File 

def tryProcessSource(
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    retainValues: (Int, List[String]) => Option[List[String]] = 
    (index, parsedValues) => Some(parsedValues) 
): Try[List[List[String]]] = { 
    def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] = 
    try {Try(transfer(source))} finally {source.close()} 
    def recursive(
    remaining: Iterator[(String, Int)], 
    accumulator: List[List[String]], 
    isEarlyAbort: Boolean = 
     false 
): List[List[String]] = { 
    if (isEarlyAbort || !remaining.hasNext) 
     accumulator 
    else { 
     val (line, index) = 
     remaining.next 
     parseLine(index, line) match { 
     case Some(values) => 
      filterLine(index, values) match { 
      case Some(keep) => 
       if (keep) 
       retainValues(index, values) match { 
        case Some(valuesNew) => 
        recursive(remaining, valuesNew :: accumulator) //capture values 
        case None => 
        recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
       } 
       else 
       recursive(remaining, accumulator) //discard row 
      case None => 
       recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
      } 
     case None => 
      recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
     } 
    } 
    } 
    Try(Source.fromFile(file)).flatMap(
    bufferedSource => 
     usingSource(bufferedSource) { 
     source => 
      recursive(source.getLines().buffered.zipWithIndex, Nil).reverse 
     } 
) 
} 

虽然这种解决方案是比较简洁的,我花了大量的时间和许多重构经过我终于能去这里之前。请让我知道,如果你看到任何可能改进的方式。


更新:我刚刚问过下面的问题为it's own StackOverflow question。现在它在has an answer fixing the error下面提到。

我有这样的想法,试图使这个更具通用性,将retainValues参数更改为transformLine,下面是新的泛型函数定义。但是,我不断得到IntelliJ中的突出显示错误“表达式类型Some [List [String]]不符合预期类型Option [A]”并且无法弄清楚如何更改默认值,因此错误消失了。

def tryProcessSource2[A <: AnyRef](
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    transformLine: (Int, List[String]) => Option[A] = 
    (index, parsedValues) => Some(parsedValues) 
): Try[List[A]] = { 
    ??? 
} 

任何有关如何使这项工作的援助将不胜感激。