2017-05-29 112 views
0

受到T.Nurkiewicz的“用RxJava进行反应式编程”的启发,我尝试将它应用到我正在处理的项目中,这是我面临的问题。RxJava:OnErrorFailedException。识别正确的原因

我有一个Rest端点,它接受一个输入流和一个用户名,并且为更新的用户名返回一个链接或返回一个错误的请求错误。以下是我想实现这个使用RxJava:

@PUT 
    @Path("{username}") 
    public Response updateCredential(@PathParam("username") final String username, InputStream stream) { 
     CredentialCandidate candidate = new CredentialCandidate(); 
     Observable.just(repository.getByUsername(username)) 
       .subscribe(
        credential -> { 
          serializeCandidate(candidate, stream); 
          try { 
           repository.updateCredential(build(credential, candidate)); 
          } catch (Exception e) { 
           String msg = "Failed to update credential +\""+username+"\": "+e.getMessage(); 
           throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build()); 
          } 
         }, 
         ex -> { 
          String msg = "Couldn't update credential \""+username+"\"" 
          + ". A credential with such username doesn't exist: " + ex.getMessage(); 
          logger.error(msg); 
          throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build()); 
       });//if the Observable completes without exceptions we have a success case 
     Map<String, String> map = new HashMap<>(); 
     map.put("path", "credential/" + username); 
     return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build(); 
} 

我的问题是在该行11(onNext方法的catch子句)。这是日志输出,迅速将展示发生了什么:

19:23:50.472 [http-listener(4)] ERROR com.vgorcinschi.rimmanew.rest.services.CredentialResourceService    - Couldn't update credential "admin". A credential with such username doesn't exist: Failed to update credential +"admin": Password too weak! 

所以在onNext方法抛出的异常用于上游和结束行动的onError方法!显然this works as designed,但我很困惑,我可以如何返回错误请求错误的正确原因。毕竟在我的测试用例中,存储库找到了用户的凭证,但是正确的错误是建议的密码太弱。这是产生错误的helper方法:

private Credential build(Credential credential, CredentialCandidate candidate) { 
     if(!isOkPsswd.test(candidate.getPassword())){ 
      throw new BadRequestException("Password too weak!", Response.status(Response.Status.BAD_REQUEST).build()); 
     } 
... 
} 

我仍然相当新的反应式编程,所以我意识到我可能会丢失的东西是显而易见的。浏览本书并没有让我得到答案,所以我会很感激任何帮助。

以防万一,这是完整的堆栈跟踪:

updateCredentialTest(com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest) Time elapsed: 0.798 sec <<< ERROR! 
rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError 
    at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.lambda$updateCredential$9(CredentialResourceService.java:245) 
    at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39) 
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) 
    at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276) 
    at rx.Subscriber.setProducer(Subscriber.java:209) 
    at rx.Subscriber.setProducer(Subscriber.java:205) 
    at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138) 
    at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129) 
    at rx.Observable.subscribe(Observable.java:10238) 
    at rx.Observable.subscribe(Observable.java:10205) 
    at rx.Observable.subscribe(Observable.java:10045) 
    at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.updateCredential(CredentialResourceService.java:238) 
    at com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest.updateCredentialTest(CredentialResourceServiceTest.java:140) 

回答

3

这似乎你没有掌握无编程原理权。

第一件事是,Observable是通过他们的API是异步的,而你正试图强制执行是同步的API,通过尝试直接从方法返回Response值,而不是返回Observable<Response>随着时间的推移射向Response值通过其onNext()通知。
这就是为什么你要处理异常的原因,为了创建符合某些规则的适当流(Observable contract),每个通知lambda方法(onNext/onError)被Observable机制封装,其中一些预期行为是错误应该重定向到onError()方法,这是一个异常捕获方法,你不应该扔到那里,抛出那里将被认为是致命的错误,并且会被抛出OnErrorFailedException吞下。

理想的情况下它会是这样的:为了使订阅时(而Observable.just(repository.getByUsername(username))将起到同步时可观察的是结构)的要求发生

public Observable<Response> updateCredential(@PathParam("username") final String username, 
              InputStream stream) { 
    rerurn Observable.fromCallable(() -> { 
     CredentialCandidate candidate = new CredentialCandidate(); 
     Credential credential = repository.getByUsername(username); 
     serializeCandidate(candidate, stream); 
     repository.updateCredential(build(credential, candidate)); 
     Map<String, String> map = new HashMap<>(); 
     map.put("path", "credential/" + username); 
     return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build(); 
    }) 
      .onErrorReturn(throwable -> { 
       String msg = "Failed to update credential +\"" + username + "\": " + e.getMessage(); 
       throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build()); 
      }); 
} 

使用fromCallable,成功路径withing可调用本身,如果发生任何错误,您将使用onErrorReturn运算符将其转换为您的自定义异常。

用他的方法你将返回Observable对象,当你订阅它时,你将获得Observable的所有好处和反应方法,例如能够将它与其他一些操作组合起来,能够从外部指定它是否会同步(当前线程)或某个其他线程上的异步(使用Scheduler)。

关于反应式编程的更多详细解释,我建议从AndréStaltz的这个伟大的tutorial开始。