2016-03-02 43 views
0

我刚开始尝试让我的头绕过RxJava,以便我可以使用项目反应堆重构遗留SOA系统使用非阻塞异步微服务。重构遗留SOA系统使用非阻塞异步微服务

在我做了可行性研究,并考虑使用像勺子来改造传统服务代码的时刻(有无关然而这个问题)

我想知道我会怎样使用reactor-bus Request/Reply语法来替换此同步服务代码。或者即使我应该使用完全不同的反应器结构。

下面是一个传统的soa服务的例子,它被设计出来,因此它可能没有什么意义,但基本上每个服务都依赖于最后的结果。

public static Map<String, Object> createAccount(DispatchContext dctx, Map<String, Object> context) { 
    LocalDispatcher dispatcher = dctx.getDispatcher(); 

    String accountPartyId = (String) context.get("partyId"); 

    Map<String, Object> input = UtilMisc.toMap("groupName", context.get("accountName"), "groupNameLocal", context.get("groupNameLocal"), "officeSiteName", context.get("officeSiteName"), "description", context.get("description"), "partyId", accountPartyId); 

    Map<String, Object> serviceResults1 = dispatcher.runSync("createPartyGroup", input); 

    Map<String, Object> serviceResults2 = dispatcher.runSync("createPartyRole", UtilMisc.toMap("partyId", (String) serviceResults1.get("partyId"), "roleTypeId", "ACCOUNT")); 

    String dataSourceId = (String) context.get("dataSourceId"); 
    Map<String, Object> serviceResults3 = null; 
    if (dataSourceId != null) { 
     serviceResults3 = dispatcher.runSync("crmsfa.addAccountDataSource", UtilMisc.toMap("partyId", (String) serviceResults2.get("partyId"), "dataSourceId", dataSourceId)); 
    } 

    String marketingCampaignId = (String) context.get("marketingCampaignId"); 
    Map<String, Object> serviceResults4 = null; 
    if (marketingCampaignId != null) { 
     serviceResults4 = dispatcher.runSync("crmsfa.addAccountMarketingCampaign", UtilMisc.toMap("partyId", (String) serviceResults3.get("partyId"), "marketingCampaignId", marketingCampaignId)); 

    } 

    String initialTeamPartyId = (String) context.get("initialTeamPartyId"); 
    Map<String, Object> serviceResults5 = null; 
    if (initialTeamPartyId != null) { 
     serviceResults5 = dispatcher.runSync("crmsfa.assignTeamToAccount", UtilMisc.toMap("accountPartyId", (String) serviceResults4.get("partyId"), "teamPartyId", initialTeamPartyId, "userLogin", userLogin)); 
    } 

    Map<String, Object> results = ServiceUtil.returnSuccess(); 
    results.put("groupId", (String) serviceResults1.get("groupId")); 
    results.put("roleId", (String) serviceResults2.get("roleId")); 
    results.put("dataSourceId", (String) serviceResults3.get("dataSourceId")); 
    results.put("marketingCampaignId", (String) serviceResults4.get("marketingCampaignId")); 
    results.put("teamPartyId", (String) serviceResults5.get("teamPartyId")); 
    return results; 
} 

基本上这是一个调用使用dispatcher.runSync其他服务的服务......我只是在寻找一个起点,我的研究如何可能要使用的反应器,甚至另一个库将这一类型的语法转换为异步非阻塞代码。

在这一点上,我在回调/某种Promise类型结构非常模糊的术语。

像第一次调用另一个服务是

Map<String, Object> serviceResults = dispatcher.runSync("createPartyGroup", input); 

如果这不是返回包含serviceResults映射,则该方法的其余部分可以移动到无极的onComplete块,其结果将是一个承诺对象构成这种服务方法的一堆深度嵌套的onComplete代码块。

Promise p = task { 
    // createPartyGroup service call 
} 
p.onComplete { result -> 

Promise p2 = task { 
    // createPartyRole sevice call 
} 

p2.onComplete { result -> 
//next service call 
} 
} 
} 

或寻找在反应器总线文档类似不使在许多层面上的意义,我只是不知道有足够的了解反应堆知道为什么它没有意义或者什么学习以下接下来让我明白为什么它没有任何意义

bus.send("service.createPartyGroup", Event.wrap(input, "reply.service.createPartyGroup")); 
bus.receive($("reply.service.createPartyGroup"), ev -> { 
Map<?> input2 = UtilMisc.toMap("partyId", (String) ev.get("partyId"), "roleTypeId", "ACCOUNT") 
    bus.send("service.createPartyRole", Event.wrap(input2, "reply.service.createPartyRole")); 
}); 

我意识到这是一个相当奇怪的地方开始研究了反应式编程范式。但是替换这个同步服务代码是我的最终目标,如果我理解了至少可以从中反向工作的语法。

回答

0

你只需要使用Observable,在你的流程中,一个发出的项目通过流。 检查文档https://github.com/ReactiveX/RxJava

这将是一个连续的流

Observable.just(methodThatCallFirstServiceAndReturnObservable(params)) 
      .flatMap(resul1 -> methodThatCallSecondAndReturnObservable(resul1)) 
      .flatMap(resul2 -> methodThatCallThirdAndReturnObservable(resul2)) 
      .subscribe(result3->"Last value emmited here:"); 

您可同时运行三个服务调用和使用Observable.zip得到所有的值加在一起或合并。但我相信这不是你在这里需要的。

+0

好的我看到了一般形式,但我怎样才能发送同步输入地图作为事件,以及如何以事件的形式返回结果地图...有总线管道等可以详细说明如何你的代码片段涉及到bus.send bus.receive或... on()..notify()概念 – user2804010

+0

抱歉,不理解这个问题 – paul