2016-06-28 86 views
0

我的akka​​流在一个元素之后停止。这是我的流:Akka流在一个元素后停止

val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
    FirehoseActor.props(
    auth = ... 
) 
) 

val ref = Flow[FirehoseActor.RawTweet] 
    .map(r => ResponseParser.parseTweet(r.payload)) 
    .map { t => println("Received: " + t); t } 
    .to(Sink.onComplete({ 
    case Success(_) => logger.info("Stream completed") 
    case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}") 
    })) 
    .runWith(firehoseSource) 

FirehoseActor连接到Twitter firehose并将消息缓冲到队列。当演员接收Request消息时,它take S中的下一个元素,并返回它:

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    onNext(RawTweet(queue.take())) 
} 

的问题是,只有一个单一的鸣叫正在打印到控制台。该程序不会退出或抛出任何错误,我已经在周围散布了日志记录,并且没有打印任何错误。

我认为接收器会继续施加压力来拉动元件,但似乎并不是这样,因为Sink.onComplete中的消息都没有打印出来。我也尝试使用Sink.ignore,但也只打印单个元素。演员中的日志消息也只会打印一次。

我需要使用什么接收器才能使其无限期地通过流?

回答

0

啊我应该在我的演员中尊重totalDemand。这修复该问题:

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    while (totalDemand > 0) { 
     onNext(RawTweet(queue.take())) 
    } 

我期待收到Request对于流中的每一个元素,但显然每个流将发送Request