2016-09-26 94 views
2

我正在使用play framework Iteratee来读取文件。我想按块处理这个文件块(对于每一步)。如何使用Play Iteratees按块读取和处理文件块(对于块的每个步骤)

我撰写下列步骤:

  • groupByLines: Enumeratee[Array[Byte], List[String]]
  • turnIntoLines: Enumeratee[List[String], List[Line]](I定义case class Line(number: Int, value: String)
  • parseChunk: Iteratee[List[Line], Try[List[T]]](例如CSV解析)

要定义groupByLines,我需要使用Iteratee.fold将前一个块的最后一行与当前块的第一个连接起来。

问题是,这会创建一个包含文件所有行的单个块。

但我想按块处理文件块。我的意思是groupByLines应该产生200行(例如)的块。

同样的问题发生在turnIntoLine。我也使用fold来创建线。我需要使用累加器(由fold提供)来压缩行号和行内容。

我是一个玩游戏iteratee的初学者。

这里是我的代码:

val chunkSize = 1024 * 8 

val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize) 

def isLastChunk(chunk: Array[Byte]): Boolean = { 
    chunk.length < chunkSize 
} 

val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped { 
    println("groupByLines") 
    Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) { 
    case ((accLast, accLines), chunk) => 
     println("groupByLines chunk size " + chunk.length) 
     new String(chunk) 
     .trim 
     .split("\n") 
     .toList match { 
     case lines @ Cons(h, tail) => 
      val lineBetween2Chunks: String = accLast + h 

      val goodLines = 
      isLastChunk(chunk) match { 
       case true => Cons(lineBetween2Chunks, tail) 
       case false => Cons(lineBetween2Chunks, tail).init 
      } 

      (lines.last, accLines ++ goodLines) 
     case Nil => ("", accLines) 
     } 
    }.map(_._2) 
} 


val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped { 
    println("turnIntoLines") 
    Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) { 
    case ((index, accLines), chunk) => 
     println("turnIntoLines chunk size " + chunk.length) 
     val lines = 
     ((Stream from index) zip chunk).map { 
      case (lineNumber, content) => Line(lineNumber, content) 
     }.toList 
     (index + chunk.length, lines ++ accLines) 
    }.map(_._2) 
} 

回答

0

这里的问题是,如何处理通过使用播放Iteratees行文件行。

首先,为了读取使用UTF-8一个文件中,我使用:

object EnumeratorAdditionalOperators { 
    implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e) 
} 

class EnumeratorAdditionalOperators(e: Enumerator.type) { 

    def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] = 
    e.fromFile(file, chunkSize) 
     .map(bytes => new String(bytes, Charset.forName("UTF-8"))) 

} 

然后,向分割输入块成线(切口在'\n'):

object EnumerateeAdditionalOperators { 
    implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e) 
} 

class EnumerateeAdditionalOperators(e: Enumeratee.type) { 

    def splitToLines: Enumeratee[String, String] = e.grouped(
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
     Iteratee.consume() 
) 

} 

第三,到加行号码,我用了这里发现的一段代码https://github.com/michaelahlers/michaelahlers-playful/blob/master/src/main/scala/ahlers/michael/playful/iteratee/EnumerateeFactoryOps.scala。我定义implicits的方法 “添加” 到EnumeratorEnumeratee

class EnumerateeAdditionalOperators(e: Enumeratee.type) { 

    /** 
    * As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer. 
    */ 
    def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] = 
    zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] { 

     case Input.Empty => 
     Enumerator.enumInput[E](Input.Empty) 

     case Input.El((element, index)) if 0 < index => 
     separators andThen Enumerator(element) 

     case Input.El((element, _)) => 
     Enumerator(element) 

     case Input.EOF => 
     Enumerator.enumInput[E](Input.EOF) 

    } 

    /** 
    * Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function. 
    * 
    * (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[https://stackoverflow.com/a/27589990/700420 a question about enumeratees on Stack Overflow]].) 
    */ 
    def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = 
    e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) { 
     case ((_, index), value) => 
     value -> step(index) 
    } 

    /** 
    * Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time. 
    */ 
    def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one)) 

    /** 
    * Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]]. 
    */ 
    def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0) 

    // ... 

} 

注意。这个技巧可以编写,例如:Enumerator.fromUTF8File(file)

放在一起:

case class Line(number: Int, value: String) 


Enumerator.fromUTF8File(file) &> 
Enumeratee.splitToLines ><> 
Enumeratee.zipWithIndex ><> Enumeratee.map{ 
    case (e, idx) => Line(idx, e) 
} // then an Iteratee or another Enumeratee 

新的代码比对问题给出一个更简洁明了。