我正在使用Akka(2.5.1)在Java(8)Spring Boot(1.5.2.RELEASE)应用程序中使用反应模式。这很好,但现在我被困在试图从actor中运行CompletableFuture。为了模拟这个,我创建了一个返回CompletableFuture的非常简单的服务。但是,当我然后尝试将结果返回给调用控制器时,我收到有关死信的错误,并且没有返回响应。从Actor中运行CompletableFuture
我正的错误是:
[INFO] [2017年5月5日13:12:25.650] [阿卡 - 弹簧 - 演示akka.actor.default-调度-5] [ akka:// akka-spring-demo/deadLetters] Actor [akka:// akka-spring-demo/user/$ a#-1561144664]的消息[java.lang.String]给Actor [akka:// akka-spring -demo/deadLetters]未送达。 [1]遇到了死信。可以关闭此日志记录,也可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'来调整日志记录。
这是我的代码。这是调用演员控制器:
@Component
@Produces(MediaType.TEXT_PLAIN)
@Path("/")
public class AsyncController {
@Autowired
private ActorSystem system;
private ActorRef getGreetingActorRef() {
ActorRef greeter = system.actorOf(SPRING_EXTENSION_PROVIDER.get(system)
.props("greetingActor"));
return greeter;
}
@GET
@Path("/foo")
public void test(@Suspended AsyncResponse asyncResponse, @QueryParam("echo") String echo) {
ask(getGreetingActorRef(), new Greet(echo), 1000)
.thenApply((greet) -> asyncResponse.resume(Response.ok(greet).build()));
}
}
这里的服务:
@Component
public class GreetingService {
public CompletableFuture<String> greetAsync(String name) {
return CompletableFuture.supplyAsync(() -> "Hello, " + name);
}
}
那么这里就是演员接到电话。起初,我有这样的:
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class GreetingActor extends AbstractActor {
@Autowired
private GreetingService greetingService;
@Autowired
private ActorSystem system;
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Greet.class, this::onGreet)
.build();
}
private void onGreet(Greet greet) {
greetingService.greetAsync(greet.getMessage())
.thenAccept((greetingResponse) -> getSender().tell(greetingResponse, getSelf()));
}
}
这导致2个调用正确处理,但之后,我会得到死信错误。然后我读到这里什么可能导致我的问题: http://doc.akka.io/docs/akka/2.5.1/java/actors.html
警告 当使用未来的回调,你需要仔细避免关闭在包含演员的参考里面的演员,即不调用方法或访问可变状态上来自回调中的封闭演员。这会破坏actor封装并可能引入同步错误和竞争条件,因为回调将同时安排给封闭的actor。不幸的是,在编译时还没有办法检测这些非法访问。参见:参与者和共享的可变状态
所以我想到的是,你将结果传递给self()之后,你可以做getSender()。tell(response,getSelf())。
所以我改变了我的代码如下:
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class GreetingActor extends AbstractActor {
@Autowired
private GreetingService greetingService;
@Autowired
private ActorSystem system;
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Greet.class, this::onGreet)
.match(String.class, this::onGreetingCompleted)
.build();
}
private void onGreet(Greet greet) {
pipe(greetingService.greetAsync(greet.getMessage()), system.dispatcher()).to(getSelf());
}
private void onGreetingCompleted(String greetingResponse) {
getSender().tell(greetingResponse, getSelf());
}
}
的onGreetingCompleted方法被调用从GreetingService的响应,但在那个时候,我再次得到了死字母错误,以便出于某种原因它可以”将响应发回给呼叫控制器。
需要注意的是,如果我改变服务这样的:在演员
@Component
public class GreetingService {
public String greet(String name) {
return "Hello, " + name;
}
}
而且onGreet到:
private void onGreet(Greet greet) {
getSender().tell(greetingService.greet(greet.getMessage()), getSelf());
}
然后一切工作正常。所以看起来我的基本Java/Spring/Akka正确设置了,它只是在我的演员试图调用一个CompletableFuture来解决问题的时候。
任何帮助将不胜感激,谢谢!
deadLetters意味着要么发送消息给无效的(过时的)引用。有了这个问题,如果在响应之前询问超时(通过实际创建临时演员请求工作),就会发生这种情况。也许发布这个错误会有所帮助。尽管我必须承认我没有很多想法。 –
我编辑了我的帖子以显示我收到的错误。我意识到死信的含义以及如何获得它们。我只是不知道为什么我在这个特殊情况下得到他们。 –
谢谢,我需要看看什么样的消息类型是去哪里,谁是实际的预期收件人。希望答案解释你的疑惑。 –