2010-10-24 57 views
2

我有一个迭代器(实际上是一个Source.getLines),它从URL中读取无限数据流。当有连接问题时,偶尔迭代器会抛出java.io.IOException。在这种情况下,我需要重新连接并重新启动迭代器。我希望这是无缝的,以便迭代器看起来像一个正常的消费者迭代器,但在下面根据需要重新启动。重新启动迭代器在Scala中的异常

例如,我想看到以下行为:

scala> val iter = restartingIterator(() => new Iterator[Int]{ 
    var i = -1 
    def hasNext = { 
    if (this.i < 3) { 
     true 
    } else { 
     throw new IOException 
    } 
    } 
    def next = { 
    this.i += 1 
    i 
    } 
}) 
res0: ... 

scala> iter.take(6).toList 
res1: List[Int] = List(0, 1, 2, 3, 0, 1) 

我有一个部分解决了这个问题,但它会在某个角落的情况下失败(例如IOException异常后的第一项重新启动),它是相当难看:

def restartingIterator[T](getIter:() => Iterator[T]) = new Iterator[T] { 
    var iter = getIter() 
    def hasNext = { 
    try { 
     iter.hasNext 
    } catch { 
     case e: IOException => { 
     this.iter = getIter() 
     iter.hasNext 
     } 
    } 
    } 
    def next = { 
    try { 
     iter.next 
    } catch { 
     case e: IOException => { 
     this.iter = getIter() 
     iter.next 
     } 
    } 
    } 
} 

我一直感觉像有一个更好的解决了这个,也许一些Iterator.continuallyutil.control.Exception或类似的东西的组合,但我想不出一个出来。有任何想法吗?

+0

我添加了一个解决方案'持续'和'util.control.Exception'到我原来的答案。 – huynhjl 2010-11-20 12:07:58

回答

4

这是相当接近的版本,并使用scala.util.control.Exception

def restartingIterator[T](getIter:() => Iterator[T]) = new Iterator[T] { 
    import util.control.Exception.allCatch 
    private[this] var i = getIter() 
    private[this] def replace() = i = getIter() 
    def hasNext: Boolean = allCatch.opt(i.hasNext).getOrElse{replace(); hasNext} 
    def next(): T = allCatch.opt(i.next).getOrElse{replace(); next} 
} 

出于某种原因,这并不是尾递归,但它可以通过使用一个稍微详细的版本是固定的:

def restartingIterator2[T](getIter:() => Iterator[T]) = new Iterator[T] { 
    import util.control.Exception.allCatch 
    private[this] var i = getIter() 
    private[this] def replace() = i = getIter() 
    @annotation.tailrec def hasNext: Boolean = { 
    val v = allCatch.opt(i.hasNext) 
    if (v.isDefined) v.get else {replace(); hasNext} 
    } 
    @annotation.tailrec def next(): T = { 
    val v = allCatch.opt(i.next) 
    if (v.isDefined) v.get else {replace(); next} 
    } 
} 

编辑:有一个解决方案,与util.control.ExceptionIterator.continually

def restartingIterator[T](getIter:() => Iterator[T]) = { 
    import util.control.Exception.allCatch 
    var iter = getIter() 
    def f: T = allCatch.opt(iter.next).getOrElse{iter = getIter(); f} 
    Iterator.continually { f } 
} 
+0

是的,使它递归解决了我有点担心的角落案例。我想我可以通过将我的解决方案中的第二个“iter.hasNext”和“iter.next”更改为“this.hasNext”和“this.next”并添加talrec注释,获得几乎相同的行为。 我很希望有一个更简单的解决方案的基础上,但不知何故。 – Steve 2010-10-25 07:19:32

+0

非常酷。这正是我期待的那种,谢谢! – Steve 2010-11-21 15:04:38

+0

@ huynhji-我有点困惑的片段if(v.isDefined)v.get else {replace(); next}和if(v.isDefined)v.get else {replace(); hasNext}。如果出现异常,这两行不要将迭代器重置为开始。我试图理解它将如何跳过抛出异常的部分,并转移到它正在迭代的源的下一个元素? – 2013-06-09 21:02:23

2

有一个更好的解决方案中,Iteratee:

http://apocalisp.wordpress.com/2010/10/17/scalaz-tutorial-enumeration-based-io-with-iteratees/

这里是例如其上遇到的异常重新启动的枚举器。

def enumReader[A](r: => BufferedReader, it: IterV[String, A]): IO[IterV[String, A]] = { 
    val tmpReader = r 
    def loop: IterV[String, A] => IO[IterV[String, A]] = { 
    case [email protected](_, _) => IO { i } 
    case Cont(k) => for { 
     s <- IO { try { val x = tmpReader.readLine; IO(x) } 
       catch { case e => enumReader(r, it) }}.join 
     a <- if (s == null) k(EOF) else loop(k(El(s))) 
    } yield a 
    } 
    loop(it) 
} 

内循环推进了Iteratee,但外函数仍保留原来的。由于Iteratee是一个持久的数据结构,要重新启动,您只需再次调用该函数即可。

我在这里通过读卡器的名称,以便r本质上是一个功能,为您提供一个全新的(重新启动)读卡器。在实践中,您会希望更有效地将其括起来(关闭现有读者的例外情况)。

+0

有趣的文章,但它并没有真正谈论处理异常。你能详细说明你将如何使用scalaz迭代器来处理我的问题吗? – Steve 2010-10-25 07:11:56

+0

我盯着这个15分钟,但我仍然无法把头围住它。我认为这对我来说可能不好,即使/当我知道这些代码时...... – Steve 2010-10-25 13:53:13

+1

这篇文章解释了它。代码基本上是这样说的:要将Reader从Reader中提供给Iteratee,请检查它是否接受了输入。如果是,请将其退回。如果需要更多的输入,它将有一个函数'k'来接受输入。从阅读器读取一行并将其分配给's'。如果我们得到异常,请重新启动整个枚举。如果我们得到一个空行,向Iteratee发信号表示我们已经到达EOF。否则,将's'输入'k'并循环。 – Apocalisp 2010-10-25 17:56:55

1

这里是不起作用的答案,但感觉像它应该:

def restartingIterator[T](getIter:() => Iterator[T]): Iterator[T] = { 
    new Traversable[T] { 
    def foreach[U](f: T => U): Unit = { 
     try { 
     for (item <- getIter()) { 
      f(item) 
     } 
     } catch { 
     case e: IOException => this.foreach(f) 
     } 
    } 
    }.toIterator 
} 

我觉得这很清楚地描述了控制流,这是伟大的。

该代码将引发Scala中的一个StackOverflowError 2.8.0因为bug in Traversable.toStream的,但即使是修复该错误后,该代码仍然不会给我使用的情况下工作,因为toIterator电话toStream,这意味着它会将所有项目存储在内存中。

我希望能够通过编写foreach方法来定义Iterator,但似乎没有任何简单的方法可以做到这一点。