2017-05-05 51 views
0

我正在使用Akka(2.5.1)在Java(8)Spring Boot(1.5.2.RELEASE)应用程序中使用反应模式。这很好,但现在我被困在试图从actor中运行CompletableFuture。为了模拟这个,我创建了一个返回CompletableFuture的非常简单的服务。但是,当我然后尝试将结果返回给调用控制器时,我收到有关死信的错误,并且没有返回响应。从Actor中运行CompletableFuture

我正的错误是:

[INFO] [2017年5月5日13:12:25.650] [阿卡 - 弹簧 - 演示akka.actor.default-调度-5] [ akka:// akka-spring-demo/deadLetters] Actor [akka:// akka-spring-demo/user/$ a#-1561144664]的消息[java.lang.String]给Actor [akka:// akka-spring -demo/deadLetters]未送达。 [1]遇到了死信。可以关闭此日志记录,也可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'来调整日志记录。

这是我的代码。这是调用演员控制器:

@Component 
@Produces(MediaType.TEXT_PLAIN) 
@Path("/") 
public class AsyncController { 
    @Autowired 
    private ActorSystem system; 

    private ActorRef getGreetingActorRef() { 
     ActorRef greeter = system.actorOf(SPRING_EXTENSION_PROVIDER.get(system) 
        .props("greetingActor")); 

     return greeter; 
    } 

    @GET 
    @Path("/foo") 
    public void test(@Suspended AsyncResponse asyncResponse, @QueryParam("echo") String echo) { 
     ask(getGreetingActorRef(), new Greet(echo), 1000) 
      .thenApply((greet) -> asyncResponse.resume(Response.ok(greet).build())); 
    } 
} 

这里的服务:

@Component 
public class GreetingService { 
    public CompletableFuture<String> greetAsync(String name) { 
     return CompletableFuture.supplyAsync(() -> "Hello, " + name); 
    } 
} 

那么这里就是演员接到电话。起初,我有这样的:

@Component 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class GreetingActor extends AbstractActor { 
    @Autowired 
    private GreetingService greetingService; 

    @Autowired 
    private ActorSystem system; 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
       .match(Greet.class, this::onGreet) 
       .build(); 
    } 

    private void onGreet(Greet greet) { 
     greetingService.greetAsync(greet.getMessage()) 
      .thenAccept((greetingResponse) -> getSender().tell(greetingResponse, getSelf())); 
    } 

} 

这导致2个调用正确处理,但之后,我会得到死信错误。然后我读到这里什么可能导致我的问题: http://doc.akka.io/docs/akka/2.5.1/java/actors.html

警告 当使用未来的回调,你需要仔细避免关闭在包含演员的参考里面的演员,即不调用方法或访问可变状态上来自回调中的封闭演员。这会破坏actor封装并可能引入同步错误和竞争条件,因为回调将同时安排给封闭的actor。不幸的是,在编译时还没有办法检测这些非法​​访问。参见:参与者和共享的可变状态

所以我想到的是,你将结果传递给self()之后,你可以做getSender()。tell(response,getSelf())。

所以我改变了我的代码如下:

@Component 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class GreetingActor extends AbstractActor { 
    @Autowired 
    private GreetingService greetingService; 

    @Autowired 
    private ActorSystem system; 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
       .match(Greet.class, this::onGreet) 
       .match(String.class, this::onGreetingCompleted) 
       .build(); 
    } 

    private void onGreet(Greet greet) { 
     pipe(greetingService.greetAsync(greet.getMessage()), system.dispatcher()).to(getSelf()); 
    } 

    private void onGreetingCompleted(String greetingResponse) { 
     getSender().tell(greetingResponse, getSelf()); 
    } 

} 

的onGreetingCompleted方法被调用从GreetingService的响应,但在那个时候,我再次得到了死字母错误,以便出于某种原因它可以”将响应发回给呼叫控制器。

需要注意的是,如果我改变服务这样的:在演员

@Component 
public class GreetingService { 
    public String greet(String name) { 
     return "Hello, " + name; 
    } 
} 

而且onGreet到:

private void onGreet(Greet greet) { 
    getSender().tell(greetingService.greet(greet.getMessage()), getSelf()); 
} 

然后一切工作正常。所以看起来我的基本Java/Spring/Akka正确设置了,它只是在我的演员试图调用一个CompletableFuture来解决问题的时候。

任何帮助将不胜感激,谢谢!

+0

deadLetters意味着要么发送消息给无效的(过时的)引用。有了这个问题,如果在响应之前询问超时(通过实际创建临时演员请求工作),就会发生这种情况。也许发布这个错误会有所帮助。尽管我必须承认我没有很多想法。 –

+0

我编辑了我的帖子以显示我收到的错误。我意识到死信的含义以及如何获得它们。我只是不知道为什么我在这个特殊情况下得到他们。 –

+0

谢谢,我需要看看什么样的消息类型是去哪里,谁是实际的预期收件人。希望答案解释你的疑惑。 –

回答

1

getSender方法只能在消息的同步处理期间可靠地返回发件人的引用。

在你的第一种情况下,你必须:

greetingService.greetAsync(greet.getMessage()) 
     .thenAccept((greetingResponse) -> getSender().tell(greetingResponse, getSelf())); 

这意味着getSender()被调用一次异步未来完成。不再可靠。您可以更改到:

ActorRef sender = getSender(); 
greetingService.greetAsync(greet.getMessage()) 
     .thenAccept((greetingResponse) -> sender.tell(greetingResponse, getSelf())); 

在你的第二个例子,你有

pipe(greetingService.greetAsync(greet.getMessage()), system.dispatcher()).to(getSelf()); 

您管道将响应 “getSelf()”,即你的工人演员。原始发件人将永远不会收到任何内容(因此问题会过期)。您可以修复到:

pipe(greetingService.greetAsync(greet.getMessage()), system.dispatcher()).to(getSender()); 

在第三种情况下,你有getSender()被该消息的处理过程中同步执行的,因此它的工作原理。

+0

谢谢Diego!这确实有效。周末愉快! –