0
我试图执行基于的阿卡流快速入门指南下面的代码:阿卡流 - Source.fromPublisher
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val songs = Source.fromPublisher(SongsService.stream)
val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
songs
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total songs processed: $c"))
这里的问题是,未来永远不会返回结果。与文档示例最大的区别是我的Source。
我有一出戏枚举,其中我将其转换为阿卡出版商,造成这种SongsService.stream
当使用一个定义列表,就像来源:
val songs = Source(list)
它的工作原理,但使用Source.fromPublisher不会。
但这里的问题不是出版商的确,我可以做一个简单的操作,它的工作原理:
val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)
它穿过数据库,创建播放枚举,将其转换为出版商和我可以迭代。
任何想法?
如果您等待它,'songs.runForeach(println)'实际上是否返回'Done'? – Mullefa
@Mullefa刚刚意识到它没有。 –