2016-10-25 51 views
2

我正在使用scalikejdbc来访问一个巨大的表。我的理解是,在我映射或迭代它们之前,它会将所有行存入内存。Scalikejdbc结果集迭代器

目前我有一个使用rxscala Observable的实现,它非常简单。但接收器比读取sql慢,然后由于缓冲而获得OutOfMemory。这是我目前的制片人作为可以观察到:

def fetchProductsAsObservable(
    sql: SQL[Nothing,NoExtractor], 
    extractor: (WrappedResultSet) => ProductItem) 
) = 
    Observable[ProductItem](o => 
     try { 
      sql.foreach(row => o.onNext(extractor(row))) 
      o.onCompleted() 
     } catch { 
     case e: Throwable => o.onError(e) 
     } 
    ) 

我知道SQL.foreach方法,但它得到一个回调方法,并返回单位。 我的背景是在.NET中。无法弄清楚自己如何在scala中使用scalikejdbc正确实现一个简单的Iterator,我可以在这个平台上进行处理?

+0

小心显示部分代码? – mfirry

+0

当然,编辑问题 –

回答

-1

您的问题与ScalikeJDBC行为无关,但与RxJava无关。 由于您的Observable是由热源提供的,因此您需要使用背压策略。

+0

谢谢你的最新答案。我的实际背压策略是使用迭代器。 ScalaRx在这里仅仅是因为它是我从.NET大脑到Scala的唯一平台。当时我写了自己的小反应流。并且scalikejdbc 3.x添加了自己的scalikejdbc-streams库,这在我的案例中也无济于事。 –