2017-08-29 53 views
2

我正在使用返回分页资源的服务。它暴露一个单个呼叫,它是由下面的接口定义:使用分页的Iterable [T],就好像它是连续的Iterable [T]

trait Service { 
    getPage(pageSize: Int, pageCursor: String): AsyncPage[Resource] 
} 

getPage函数返回一个AsyncPage[T]对象,这是这样实现的:

/** 
* A page of contents that are retrieved asynchronously from their origin 
* 
* @param content The resource object 
* @param nextPageCursor The token representing the next page, or empty if no more pages to consume 
* @tparam T The type of resource withing the page 
*/ 
case class AsyncPage[T](
    val content: Future[Iterable[T]], 
    val nextPageCursor : Future[String] 
) { } 

的页面的内容被异步检索来自服务使用的任何存储系统。

由于我的应用程序的需要,我并不关心页面。我想编写一些代码,使我可以像使用单一的Iterable[T]一样消耗服务的资源。

但是,我想保持服务的懒惰。我不想要求超过必要的页数。这意味着我不想要求下一个页面,直到我没有使用前一个元素。

每当我有一个消耗首页的整个Iterable[T],我想要的代码使用getPage(...)功能,并提供从nextPageCursor的最后一页的pageCursor参数要求下页。

你能指导我如何做到这一点吗?

+0

那么'Iterable [T]'会在这些'Future's块上呢?否则,我认为你可以做的最好的是'Iterable [Future [Iterable [T]]''。 –

+0

我的应用程序基于Akka actor模型框架,该框架不鼓励阻止actor主题并告诉您纯粹异步工作 –

回答

1

好吧,如果你不介意的话堵,你可以做这样的事情:

class FutureIter[+P](fu: => Future[Iterator[P]]) extends AbstractIterator[P] { 
    lazy val iter = Await.result(fu) 
    def hasNext = iter.hasNext 
    def next = iter.next 
} 

    def fold[T](fs: Stream[Future[Iterator[T]]]): Iterator[T]= fs match { 
    case hd #:: tail => new FutureIter(hd) ++ fold(tail) 
    case _ => Iterator.empty 
    } 

    val pages = Stream 
    .iterate(getPage(size, "")) { getPage(size, _.nextPageCursor) } 
    .map(_.contents.map(_.iterator)) 

    val result: Iterator[T] = fold(pages) 

这将在第一页前阻止,并在每个后续页面结束加载下一批次。我不认为有一种方法可以在没有阻塞的情况下做到这一点,因为在未来得到满足之前,您无法分辨网页的结束位置。

此外,请注意,此代码生成的迭代器是无限的,因为您没有提及任何标准何时停止查找更多页面。您可以将一些.takeWhile呼叫放在pages上以纠正该问题。

您可能还需要与Iterator,这样的网页你用得到立即丢弃,而不是让缓存进行更换Stream。我刚刚使用了Stream,因为这使得fold更好一点(你不能在迭代器上匹配,将不得不使用和丑陋的if(it.hasNext) ...)。

BTW,fold看起来是递归的,但它实际上是++很懒,所以fold(tail)件将不会被执行,直到你迭代一路走过的左手侧 - 好后外部的fold已经关闭。

+0

谢谢您的回答。我还没有尝试过你的解决方案,明天会做。然而,我一直在做一些研究,发现这个[链接](http://koff.io/posts/pagination-and-streams/),作者提出了三个实现将寻呼源转变为异步流。我也可以看到依靠锁定的递归版本。我不确定其他解决方案。正如我在另一条评论中发布的,我正在使用的应用程序使用Akka actor模型,该模型不鼓励锁定主线程(可能在不同的ExecutionContext中执行异步任务) –

+0

您绝对不应该锁定主线程。将整个东西卸载到后台线程上是通常的方法:'val result:Future [Iterator [T]] = Future(fold(pages))' – Dima

+0

您引用的链接中的代码并不完全符合您的要求问 - 它显示了遍历分页结果的不同方法(有很多),而不是将它们“拼合”成值的迭代器(这是遍历它们的许多方法之一)。 – Dima

0

既然你提到的阿卡,你可以创建一个Source[T]可以排序的是,虽然作为一种异步Iterable[T]

Source.unfoldAsync[String, T](startPageCursor) { cursor => 
    val page = getPage(pageSize, cursor) 
    for { 
    nextCursor <- page.nextPageCursor 
    it <- page.content 
    } yield Some((nextCursor, it)) 
}.mapConcat(identity) 

这是更清洁,完全无阻塞。但是,如果这是合适的,则取决于您的使用情况。