2016-02-05 95 views
0

我有以下情况,我试图将处理委托给角色。我想要发生的是每当我的流程处理消息时,它就会将它发送给演员,演员会将其大写,并将其作为响应写入流中。Akka流和委托处理给演员

所以我应该能够连接到端口8000,输入“hello”,让流发送给演员,并让演员将其发回给流,以便它回传给我uppercased。演员本身非常基本,来自文档中的ActorPublisher示例。

我知道这段代码不起作用,我清理了我的实验以便编译它。现在,它只是两个独立的流。我试图合并源或汇,但无济于事。

object Sample { 
    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("sample") 
    implicit val materializer = ActorMaterializer() 

    val connections: Source[IncomingConnection, 
     Future[ServerBinding]] = Tcp().bind("localhost", 8000) 
    val filter = Source.actorPublisher[ByteString](Props[Filter]) 

    val filterRef = Flow[ByteString] 
     .to(Sink.ignore) 
     .runWith(filter) 

    connections runForeach { conn => 
     val echo = Flow[ByteString] .map { 


     // would like to send 'p' to the actor, 
     // and have it publish to the stream 
     case p:ByteString => filterRef ! p 
     } 
    } 
    } 
} 

// this actor is supposed to simply uppercase all 
// input and write it to the stream 
class Filter extends ActorPublisher[ByteString] with Actor 
{ 
    var buf = Vector.empty[ByteString] 
    val delay = 0 

    def receive = { 
    case p: ByteString => 
     if (buf.isEmpty && totalDemand > 0) 
     onNext(p) 
     else { 
     buf :+= ByteString(p.utf8String.toUpperCase) 
     deliverBuf() 
     } 
    case Request(_) => 
     deliverBuf() 
    case Cancel => 
     context.stop(self) 
    } 


    @tailrec final def deliverBuf(): Unit = 
    if (totalDemand > 0) { 
     if (totalDemand <= Int.MaxValue) { 
     val (use, keep) = buf.splitAt(totalDemand.toInt) 
     buf = keep 
     use foreach onNext 
     } else { 
     val (use, keep) = buf.splitAt(Int.MaxValue) 
     buf = keep 
     use foreach onNext 
     deliverBuf() 
     } 
    } 
} 
+0

如果你需要一个定制的演员身份参与的一部分' Flow',为什么不让它成为一个普通的'Actor'(不是发布者或订阅者),然后通过'ask'和'mapAsync'集成到'Flow'中?代码非常简单,您可以将反应流背压处理代码委托给框架,让您的演员专注于做它需要做的事情。 – cmbaxter

+0

我没有在原始文章中指定,因为我不想降低问题的精神,但在某些情况下,我希望演员正在处理不同的流,但能够发布到连接流。在这种情况下,如果目标(非源)流没有任何活动,我将无法使用“询问”。 –

回答

1

我以前也有过这个问题,有点迂回的解决方法,希望你用这种方式可以。从本质上讲,它涉及到创建一个接收器,该接收器立即将其获取的消息转发给src actor。

当然,你也可以使用直接流(注释出来),但我想这不是这个练习:)点

object Sample { 
    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("sample") 
    implicit val materializer = ActorMaterializer() 

    val connections: Source[IncomingConnection, 
     Future[ServerBinding]] = Tcp().bind("localhost", 8000) 

    def filterProps = Props[Filter] 

    connections runForeach { conn => 
     val actorRef = system.actorOf(filterProps) 
     val snk = Sink.foreach[ByteString]{s => actorRef ! s} 
     val src = Source.fromPublisher(ActorPublisher[ByteString](actorRef)) 
     conn.handleWith(Flow.fromSinkAndSource(snk, src)) 

//  conn.handleWith(Flow[ByteString].map(s => ByteString(s.utf8String.toUpperCase()))) 
    } 
    } 
} 

// this actor is supposed to simply uppercase all 
// input and write it to the stream 
class Filter extends ActorPublisher[ByteString] 
{ 
    import akka.stream.actor.ActorPublisherMessage._ 
    var buf = mutable.Queue.empty[String] 
    val delay = 0 

    def receive = { 
    case p: ByteString => 
     buf += p.utf8String.toUpperCase 
     deliverBuf() 
    case Request(n) => 
     deliverBuf() 
    case Cancel => 
     context.stop(self) 
    } 

    def deliverBuf(): Unit = { 
    while (totalDemand > 0 && buf.nonEmpty) { 
     val s = ByteString(buf.dequeue() + "\n") 
     onNext(s) 
    } 
    } 
+0

谢谢,这个工程。如果演员有不同的来源,但是我仍然想要输出到连接,您是否看到任何理由不起作用?例如,演员的来源是一个内部事件泵,当事件发生时,将其发布到连接。 –

+0

没有理由不应该。只要有需求,Filter actor可以生成自己的数据并在需要时发布它。我在我的代码中这样做。这是我不得不使用ActorPublisher的主要原因。 –

+0

谢谢。使用这种语法,其中Flow是从source和sink动态创建的,是否有办法将控制权交还给处理连接的原始流?我迷失在这个溪流中。如何将Flow.fromSinkAndSource()连接到下一个流程? –