2016-04-15 180 views
0

我试图执行基于的阿卡流快速入门指南下面的代码:阿卡流 - Source.fromPublisher

implicit val system = ActorSystem("QuickStart") 
implicit val materializer = ActorMaterializer() 

val songs = Source.fromPublisher(SongsService.stream) 

val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1) 

val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) 

val counterGraph: RunnableGraph[Future[Int]] = 
    songs 
    .via(count) 
    .toMat(sumSink)(Keep.right) 

val sum: Future[Int] = counterGraph.run() 

sum.foreach(c => println(s"Total songs processed: $c")) 

这里的问题是,未来永远不会返回结果。与文档示例最大的区别是我的Source。

我有一出戏枚举,其中我将其转换为阿卡出版商,造成这种SongsService.stream

当使用一个定义列表,就像来源:

val songs = Source(list) 

它的工作原理,但使用Source.fromPublisher不会。

但这里的问题不是出版商的确,我可以做一个简单的操作,它的工作原理:

val songs = Source.fromPublisher(SongsService.stream) 
songs.runForeach(println) 

它穿过数据库,创建播放枚举,将其转换为出版商和我可以迭代。

任何想法?

+1

如果您等待它,'songs.runForeach(println)'实际上是否返回'Done'? – Mullefa

+0

@Mullefa刚刚意识到它没有。 –

回答

4

您的发布者可能永远不会完成。