2017-06-08 72 views
0

有两个表TableATableB如何一些记录从表A复制到表B具有光滑的流媒体和阿卡串流

我需要一些记录复制从TableATableB。我用slick-3.0,并使用以下方法:

import akka.stream._ 
import akka.stream.scaladsl._ 
... 

//{{ READ DATA FROM TABLE A 
val q = TableA.filter(somePredicate).result 
val source = Source.fromPublisher { 
     db.stream(q.result).mapResult { r => 
     val record: RecordA = someTransformation(r) 
     record 
     } 
    }.grouped(50) // grouping because I want to write records in batch mode 
//}} 

//{{ WRITE DATA TO TABLE B 
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] => 
     //TODO how to write batch to TableB asynchronously? 
     val insertAction = TableB ++= batch // insert batch to table 
     val fInsert: Future[_] = db.run(insertAction) 
     Await.result(fInsert, ...)   // #1 this works only with blocking 
}) 
//}} 

但我面临着一个问题 - 如何写批处理TableB异步(参见TODO)。现在上面的代码只适用于内部未来(参见#1评论)。有异步实现该任务的正确方法吗?

+0

如果你不阻止内在的未来会发生什么? – thwiegan

+0

@thwiegan,如果我不阻止内在的未来并返回它然后它不会完成 –

+1

这似乎是你的用例:https://stackoverflow.com/questions/36400152/how-are-reactive-streams -used-in-slick-for-inserted-data我没有看到与你的例子 – thwiegan

回答

2

使用mapAsync它期望返回未来,公开“展开”结果在下一阶段。

source.mapAsync(4){batch: Seq[RecordA] => 
     val insertAction = TableB ++= batch // insert batch to table 
     db.run(insertAction) 
}).to(Sink.ignore).run