2011-11-22 76 views
6

我有很大的Iterator,我想将它们分割成几部分。我有一个谓词来看一个项目,如果它是一个新片段的开始,则返回true。我需要这些片断作为迭代器,因为即使这些片断也不适合内存。有很多片断,我会警惕递归解决方案吹出你的堆栈。这种情况与this question类似,但我需要迭代器而不是列表,并且在一个片段的开头处出现(并且应该包括谓词)的“哨兵”(谓词为真的项目)。生成的迭代器只能按顺序使用,尽管有些可能完全不能使用,并且它们只应使用O(1)内存。我想这意味着他们应该都共享相同的基础迭代器。性能很重要。Scala:通过谓词将Iterable组合成一个Iterable Iterable

如果我拿在函数签名刺,这将是这样的:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = ... 

我会喜欢使用takeWhile,但它失去了最后一个元素。我调查了span,但它缓冲了结果。我目前最好的想法涉及BufferedIterator,但也许有更好的方法。

你会知道你有它正确的,因为这样的事情不会崩溃您的JVM:

groupby((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue/2) == 0).foreach(group => println(group.sum)) 
groupby((1 to Int.MaxValue).iterator)(_ % 10 == 0).foreach(group => println(group.sum)) 
+0

见http://stackoverflow.com/questions/5410846/how-do-i-apply-the-pimp-my-library-pattern-to-scala-collections/5411133#5411133 – huynhjl

回答

5

这是我的解决方案,使用BufferedIterator。它不会让你跳过正确的迭代器,但它相当简单和功能。即使!startsGroup(first)第一个元素进入一个组。

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = 
    new Iterator[Iterator[T]] { 
    val base = iter.buffered 
    override def hasNext = base.hasNext 
    override def next() = Iterator(base.next()) ++ new Iterator[T] { 
     override def hasNext = base.hasNext && !startsGroup(base.head) 
     override def next() = if (hasNext) base.next() else Iterator.empty.next() 
    } 
    } 

更新:保持一点点的状态,您可以跳过迭代器和防止人们从以前的插科打诨:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = 
new Iterator[Iterator[T]] { 
    val base = iter.buffered 
    var prev: Iterator[T] = Iterator.empty 
    override def hasNext = base.hasNext 
    override def next() = { 
    while (prev.hasNext) prev.next()  // Exhaust previous iterator; take* and drop* do NOT always work!! (Jira SI-5002?) 
    prev = Iterator(base.next()) ++ new Iterator[T] { 
     var hasMore = true 
     override def hasNext = { hasMore = hasMore && base.hasNext && !startsGroup(base.head) ; hasMore } 
     override def next() = if (hasNext) base.next() else Iterator.empty.next() 
    } 
    prev 
    } 
} 
5

你有一个固有的问题。 Iterable意味着你可以得到多个迭代器。 Iterator意味着你只能通过一次。这意味着您的Iterable[Iterable[T]]应该能够生产Iterator[Iterable[T]]。但是,当这返回一个元素 - 一个Iterable[T] - 并要求多个迭代器时,底层的单个迭代器无法缓存列表的结果(太大)调用原始迭代并绝对经历一切再次(非常低效)。

所以,虽然你可能做到这一点,我认为你应该以不同的方式构思你的问题。

如果您可以从Seq开始,您可以抓取子集作为范围。

如果你已经知道你是怎么想使用你的迭代器,你可以写一个方法

def process[T](source: Iterable[T])(starts: T => Boolean)(handlers: T => Unit *) 

它通过一套处理程序每​​次递增starts打完一个“真”。如果有什么办法可以在一次扫描中完成处理,那么类似这样的工作就是要走的路。 (然而,你的处理程序将不得不通过可变数据结构或变量来保存状态。)

如果你可以允许在外层列表中迭代内层列表,你可以有一个​​带有额外的约束,一旦迭代到后面的子迭代器,以前的所有子迭代器都是无效的。


这里的最后一种类型中的溶液(从Iterator[T]Iterator[Iterator[T]];一个可以换这使外层Iterable代替)。

class GroupedBy[T](source: Iterator[T])(starts: T => Boolean) 
extends Iterator[Iterator[T]] { 
    private val underlying = source 
    private var saved: T = _ 
    private var cached = false 
    private var starting = false 
    private def cacheNext() { 
    saved = underlying.next 
    starting = starts(saved) 
    cached = true 
    } 
    private def oops() { throw new java.util.NoSuchElementException("empty iterator") } 
    // Comment the next line if you do NOT want the first element to always start a group 
    if (underlying.hasNext) { cacheNext(); starting = true } 
    def hasNext = { 
    while (!(cached && starting) && underlying.hasNext) cacheNext() 
    cached && starting 
    } 
    def next = { 
    if (!(cached && starting) && !hasNext) oops() 
    starting = false 
    new Iterator[T] { 
     var presumablyMore = true 
     def hasNext = { 
     if (!cached && !starting && underlying.hasNext && presumablyMore) cacheNext() 
     presumablyMore = cached && !starting 
     presumablyMore 
     } 
     def next = { 
     if (presumablyMore && (cached || hasNext)) { 
      cached = false 
      saved 
     } 
     else oops() 
     } 
    } 
    } 
} 
+1

'迭代器[迭代器[T]]'没关系;无论如何,我的基础迭代器只能并且应该只允许一次传递。我想跳过子迭代器来使以前的子迭代器失效。我不知道时间长短,所以'Seq'是不可能的。我知道我想如何使用我的迭代器,但我认为这样的函数通常会很有用。 –

1

如果您正在查看内存限制,则以下操作将起作用。只有基础的可迭代对象支持视图时才能使用它。这个实现将遍历Iterable,然后生成可以迭代的IterableViews。这个实现不关心第一个元素是否作为开始组进行测试,因为它不管用。

def groupby[T](iter: Iterable[T])(startsGroup: T => Boolean): Iterable[Iterable[T]] = new Iterable[Iterable[T]] { 
    def iterator = new Iterator[Iterable[T]] { 
    val i = iter.iterator 
    var index = 0 
    var nextView: IterableView[T, Iterable[T]] = getNextView() 
    private def getNextView() = { 
     val start = index 
     var hitStartGroup = false 
     while (i.hasNext && ! hitStartGroup) { 
     val next = i.next() 
     index += 1 
     hitStartGroup = (index > 1 && startsGroup(next)) 
     } 
     if (hitStartGroup) { 
     if (start == 0) iter.view(start, index - 1) 
     else iter.view(start - 1, index - 1) 
     } else { // hit end 
     if (start == index) null 
     else if (start == 0) iter.view(start, index) 
     else iter.view(start - 1, index) 
     } 
    } 
    def hasNext = nextView != null 
    def next() = { 
     if (nextView != null) { 
     val next = nextView 
     nextView = getNextView() 
     next 
     } else null 
    } 
    } 
} 
+0

修复了答案代码。它在getNextView中缺少“if(start == index)null”的情况 –

1

您可以通过使用流保持低内存足迹。使用result.toIterator,如果你是一个迭代器。

使用流,没有可变状态,只有一个条件,它几乎与Jay Hacker的解决方案一样简洁。

def batchBy[A,B](iter: Iterator[A])(f: A => B): Stream[(B, Iterator[A])] = { 
    val base = iter.buffered 
    val empty = Stream.empty[(B, Iterator[A])] 

    def getBatch(key: B) = { 
     Iterator(base.next()) ++ new Iterator[A] { 
     def hasNext: Boolean = base.hasNext && (f(base.head) == key) 
     def next(): A = base.next() 
     } 
    } 

    def next(skipList: Option[Iterator[A]] = None): Stream[(B, Iterator[A])] = { 
     skipList.foreach{_.foreach{_=>}} 

     if (base.isEmpty) empty 
     else { 
     val key = f(base.head) 
     val batch = getBatch(key) 

     Stream.cons((key, batch), next(Some(batch))) 
     } 
    } 

    next() 
    } 

我跑测试:

scala> batchBy((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue/2) == 0) 
     .foreach{case(_,group) => println(group.sum)} 
-1610612735 
1073741823 
-536870909 
2147483646 
2147483647 

第二测试打印太多粘贴到堆栈溢出。

0
import scala.collection.mutable.ArrayBuffer 

object GroupingIterator { 

    /** 
    * Create a new GroupingIterator with a grouping predicate. 
    * 
    * @param it The original iterator 
    * @param p Predicate controlling the grouping 
    * @tparam A Type of elements iterated 
    * @return A new GroupingIterator 
    */ 
    def apply[A](it: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean): GroupingIterator[A] = 
    new GroupingIterator(it)(p) 
} 

/** 
* Group elements in sequences of contiguous elements that satisfy a predicate. The predicate 
* tests each single potential next element of the group with the help of the elements grouped so far. 
* If it returns true, the potential next element is added to the group, otherwise 
* a new group is started with the potential next element as first element 
* 
* @param self The original iterator 
* @param p Predicate controlling the grouping 
* @tparam A Type of elements iterated 
*/ 
class GroupingIterator[+A](self: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean) extends Iterator[IndexedSeq[A]] { 

    private[this] val source = self.buffered 
    private[this] val buffer: ArrayBuffer[A] = ArrayBuffer() 

    def hasNext: Boolean = source.hasNext 

    def next(): IndexedSeq[A] = { 
    if (hasNext) 
     nextGroup() 
    else 
     Iterator.empty.next() 
    } 

    private[this] def nextGroup(): IndexedSeq[A] = { 
    assert(source.hasNext) 

    buffer.clear() 
    buffer += source.next 

    while (source.hasNext && p(source.head, buffer)) { 
     buffer += source.next 
    } 

    buffer.toIndexedSeq 
    } 
}