2017-05-15 31 views
1

如何更改订户的错误?我有一个从数据库中消耗的冷流。请参阅以下情况:错误的可观察更改订户

return coctailBundleStream 
     .doOnNext(c -> { 
      hostnames.add(c.get(KEY_HOSTNAME)); // [A] 

      sendToOutboundQueue(c.get(KEY_CREDS)); 
      archiveSentMessage(c.get(KEY_CREDS), c.get(KEY_MESSAGE_ID)); 
     }) 
     .doOnComplete(this::saveCutOffTime) 
     .doOnError(e -> informUserImpactedHostnames(hostnames, 
      theRestOfHostnamesInside(credsXmlStream, e))) // I don't think this is right 
     .onErrorResumeNext(Flowable.empty()) 
     .count(); 

我想发送受故障影响的所有主机名。但是,请参阅我上面的评论。我不认为这是正确的,因为流被消耗了两次。例如如果theRestOfHostnamesInside实现是credsStream.map(c -> c.getHostname()), e)

我认为,理想情况下,错误处理程序应该使用其他认购其提取主机名的其余部分到一个列表,然后追加与以前的清单列表中继续流(见行标与[A])。

回答

0

onErrorResumeNext应该用来提供您想要回退的流动性。

但是,主要困难是避免重复。如果源代码很冷,您将重新执行数据库请求,并且如果最初订阅的序列在出现错误之前发送了一些数据,则会重新发送相同的数据。

您可以通过在onErrorResumeNext之后链接distinct(您应该能够提供keySelector来指示如何检测重复项)。但是您必须确保使用不会将源中的两个元素标记为重复的标准(以便仅消除重试创建的重复项)。

周围的另一种方式是存储密钥已经处理自己的集合中,并在onErrorResumeNext过滤这些,但是你必须确保该收集特定于count()的下游制造每个subscribe ...所以不那很容易。

+0

我不喜欢重做相同的数据库请求的想法。这对我来说似乎没有效率。 – sancho21

+0

它取决于原始错误是什么,但是你可以退回第二个序列,只查询丢失的键......因为'onErrorResumeNext'将来自错误序列(如果有的话)的数据与来自后备的数据连接起来顺序,这符合法案 –

0

你可以这样做只是内部的flatMap

 Observable.fromIterable(yourList) 
      .flatMap(x ->{ 
       Observable.just(x) 
         .map(data -> yourNormalSave(data)) 
         .onErrorReturn(errorResult) 
      }) 
      .subscribe(result ->{ 
         if(result != errorResult) 
          count++; 
         eles{ 
         //error received here 
         } 
      } 
      ); 

所以flatMap内的,如果你遇到一个错误,将其转换为普通类型,但下游没有它的想法。所以你的下游用户也会在onNext上使用它们。你可以正确地做,而不是x -> Observable.just(x)我只是把它们作为一个例子。