我有以下情况,我试图将处理委托给角色。我想要发生的是每当我的流程处理消息时,它就会将它发送给演员,演员会将其大写,并将其作为响应写入流中。Akka流和委托处理给演员
所以我应该能够连接到端口8000,输入“hello”,让流发送给演员,并让演员将其发回给流,以便它回传给我uppercased。演员本身非常基本,来自文档中的ActorPublisher示例。
我知道这段代码不起作用,我清理了我的实验以便编译它。现在,它只是两个独立的流。我试图合并源或汇,但无济于事。
object Sample {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("sample")
implicit val materializer = ActorMaterializer()
val connections: Source[IncomingConnection,
Future[ServerBinding]] = Tcp().bind("localhost", 8000)
val filter = Source.actorPublisher[ByteString](Props[Filter])
val filterRef = Flow[ByteString]
.to(Sink.ignore)
.runWith(filter)
connections runForeach { conn =>
val echo = Flow[ByteString] .map {
// would like to send 'p' to the actor,
// and have it publish to the stream
case p:ByteString => filterRef ! p
}
}
}
}
// this actor is supposed to simply uppercase all
// input and write it to the stream
class Filter extends ActorPublisher[ByteString] with Actor
{
var buf = Vector.empty[ByteString]
val delay = 0
def receive = {
case p: ByteString =>
if (buf.isEmpty && totalDemand > 0)
onNext(p)
else {
buf :+= ByteString(p.utf8String.toUpperCase)
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
如果你需要一个定制的演员身份参与的一部分' Flow',为什么不让它成为一个普通的'Actor'(不是发布者或订阅者),然后通过'ask'和'mapAsync'集成到'Flow'中?代码非常简单,您可以将反应流背压处理代码委托给框架,让您的演员专注于做它需要做的事情。 – cmbaxter
我没有在原始文章中指定,因为我不想降低问题的精神,但在某些情况下,我希望演员正在处理不同的流,但能够发布到连接流。在这种情况下,如果目标(非源)流没有任何活动,我将无法使用“询问”。 –