我想通过Java中的WebSockets实现一个简单的RPC(或请求响应)系统(前端将有JS,但我现在正在处理后端)。如何处理Java 8 CompletableFuture中的错误?
我试图应用Java CompletableFuture
模式来处理异步发送消息。但是我陷入了错误处理的困境。
我有一个类(我们称之为rpc类)负责通过WebSocket会话发送消息(在此使用Spring WebSocket支持类),然后等待“回复”类型的消息,并将它们与待处理的请求并将内容返回给调用者。
的流程为:
- 客户端代码调用上的RPC类中的方法,以指定要在远程过程调用过程的名称,到该会话发送消息,并且在地图上的参数发送。
- 的RPC类用于发送异步使用
Executor
(线程池)的消息的另一个较低级类,并为“发送消息”操作 - 它存储在地图中的挂起的请求接收
CompletableFuture<Void>
,建立一个CompletableFuture<Map<String, Object>>
并将其与未决请求相关联,并将它们存储在地图中。它返回完美的未来。 - 当收到“回复”类型的消息时,将在同一个类上调用一个方法,此方法试图将响应与其中一个未决请求(它们具有一个ID)进行匹配,然后完成
CompletableFuture
在回复中收到的内容。
所以有3个线程:调用者线程,发送消息的线程和接收消息并完成未来的线程。
现在,我应该如何处理发送消息中的错误(例如IO错误),以便使返回的completableFuture
也失败(或者可能实现重试策略,并超时...)?
这里是发送消息的RPC类方法的代码:使用thencompose()
期货
/**
* Invoke a remote procedure over WS on the specified session, with the given arguments.
* @param session The target session on which to send the RPC message
* @param target The name of the procedure to call
* @param arguments The arguments to be sent in the message
* @return
*/
public CompletableFuture<Map<String,Object>> invoke(WebSocketSession session, String target, Map<String, Object> arguments){
Invocation invocationMessage = new Invocation(target, arguments);
invocationMessage.setId(getNextId());
// completeable future for the result. It does nothing, will be completed when reply is received which happen in a different thread, see completeInvocation
CompletableFuture<Map<String, Object>> invocationFuture = new CompletableFuture<>();
CompletableFuture<Void> senderFuture = sender.sendMessage(session, invocationMessage);
// handle problem in the sending of the message
senderFuture.exceptionally(e -> {
// is this correct ??
invocationFuture.completeExceptionally(e);
return null;
});
// store the pending invocation in the registry
registry.addPendingInvocation(new PendingInvocation(invocationMessage, session, invocationFuture));
// return the future so the caller can have access to the result once it is ready
return invocationFuture;
}
那么,我写了一些测试,并验证了行为,它实际上是我写它的方式。 –