2016-09-15 104 views
3

我想通过Java中的WebSockets实现一个简单的RPC(或请求响应)系统(前端将有JS,但我现在正在处理后端)。如何处理Java 8 CompletableFuture中的错误?

我试图应用Java CompletableFuture模式来处理异步发送消息。但是我陷入了错误处理的困境。

我有一个类(我们称之为rpc类)负责通过WebSocket会话发送消息(在此使用Spring WebSocket支持类),然后等待“回复”类型的消息,并将它们与待处理的请求并将内容返回给调用者。

的流程为:

  • 客户端代码调用上的RPC类中的方法,以指定要在远程过程调用过程的名称,到该会话发送消息,并且在地图上的参数发送。
  • 的RPC类用于发送异步使用Executor(线程池)的消息的另一个较低级类,并为“发送消息”操作
  • 它存储在地图中的挂起的请求接收CompletableFuture<Void>,建立一个CompletableFuture<Map<String, Object>>并将其与未决请求相关联,并将它们存储在地图中。它返回完美的未来。
  • 当收到“回复”类型的消息时,将在同一个类上调用一个方法,此方法试图将响应与其中一个未决请求(它们具有一个ID)进行匹配,然后完成CompletableFuture在回复中收到的内容。

所以有3个线程:调用者线程,发送消息的线程和接收消息并完成未来的线程。

现在,我应该如何处理发送消息中的错误(例如IO错误),以便使返回的completableFuture也失败(或者可能实现重试策略,并超时...)?

这里是发送消息的RPC类方法的代码:使用thencompose()期货

/** 
* Invoke a remote procedure over WS on the specified session, with the given arguments. 
* @param session The target session on which to send the RPC message 
* @param target The name of the procedure to call 
* @param arguments The arguments to be sent in the message 
* @return 
*/ 
public CompletableFuture<Map<String,Object>> invoke(WebSocketSession session, String target, Map<String, Object> arguments){ 
    Invocation invocationMessage = new Invocation(target, arguments); 
    invocationMessage.setId(getNextId()); 

    // completeable future for the result. It does nothing, will be completed when reply is received which happen in a different thread, see completeInvocation 
    CompletableFuture<Map<String, Object>> invocationFuture = new CompletableFuture<>(); 

    CompletableFuture<Void> senderFuture = sender.sendMessage(session, invocationMessage); 

    // handle problem in the sending of the message 
    senderFuture.exceptionally(e -> { 
     // is this correct ?? 
     invocationFuture.completeExceptionally(e); 
     return null; 
    }); 

    // store the pending invocation in the registry 
    registry.addPendingInvocation(new PendingInvocation(invocationMessage, session, invocationFuture)); 

    // return the future so the caller can have access to the result once it is ready 
    return invocationFuture; 
} 
+0

那么,我写了一些测试,并验证了行为,它实际上是我写它的方式。 –

回答

0

做这将是最简单的方法来简单地连锁:

// completeable future for the result. It does nothing, will be completed when reply is received which happen in a different thread, see completeInvocation 
CompletableFuture<Map<String, Object>> invocationFuture = new CompletableFuture<>(); 

CompletableFuture<Void> senderFuture = sender.sendMessage(session, invocationMessage); 

// store the pending invocation in the registry 
registry.addPendingInvocation(new PendingInvocation(invocationMessage, session, invocationFuture)); 

// return the future so the caller can have access to the result once it is ready 
return senderFuture.thenCompose(__ -> invocationFuture); 

万一例外完成senderFuture,返回的未来也将异常完成,其中CompletionException持有异常为原因(参见CompletionStage api)。

注意,有可能是您想解决以及其他问题:

  • 你不应该也取消挂起调用异常的情况?
  • 如果响应在addPendingInvocation被调用之前到达,会发生什么?在致电sendMessage以避免问题之前,您不应该打电话吗?
  • 既然你没有对invocationFuture做任何事情,那么在addPenidngInvocation里面创建它会不会更好?