0
我的akka流在一个元素之后停止。这是我的流:Akka流在一个元素后停止
val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
FirehoseActor.props(
auth = ...
)
)
val ref = Flow[FirehoseActor.RawTweet]
.map(r => ResponseParser.parseTweet(r.payload))
.map { t => println("Received: " + t); t }
.to(Sink.onComplete({
case Success(_) => logger.info("Stream completed")
case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}")
}))
.runWith(firehoseSource)
FirehoseActor
连接到Twitter firehose并将消息缓冲到队列。当演员接收Request
消息时,它take
S中的下一个元素,并返回它:
def receive = {
case Request(_) =>
logger.info("Received request for next firehose element")
onNext(RawTweet(queue.take()))
}
的问题是,只有一个单一的鸣叫正在打印到控制台。该程序不会退出或抛出任何错误,我已经在周围散布了日志记录,并且没有打印任何错误。
我认为接收器会继续施加压力来拉动元件,但似乎并不是这样,因为Sink.onComplete
中的消息都没有打印出来。我也尝试使用Sink.ignore
,但也只打印单个元素。演员中的日志消息也只会打印一次。
我需要使用什么接收器才能使其无限期地通过流?