2017-07-28 275 views
0

我对被动世界非常陌生,并且很难理解如何完成任务。我正在开发一个遗留项目,我必须实现一个接口,该接口有许多方法可以从redis中查询各种对象。有时查询就像通过ID查询散列一样简单,因此只需一次调用redis即可获取散列。其他时候,我可能需要首先根据一些参数从redis集合中查找ID,然后使用生成的ID获取哈希值。我在Spring Boot应用程序中使用Reactor 3.1.0.M3和Lettuce 5.0.0.RC1。将多个反应性Publisher/Flux/Mono对象组合到阻止请求中

我现有的代码,以这两个实例方法是这样的:

public <T extends CatalogInfo> T get(String id, Class<T> clazz) { 
     String result = (String)repository.getRedisHashRepository().getHashById(CatalogUtils.root(clazz).getSimpleName(), id); 
     if (null != result) { 
      return serializer.fromData(result, clazz); 
     } 
     return null; 

    } 

    public <T extends CatalogInfo> T get(String attName, String attValue, Class<T> clazz) { 

     String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue); 

     String id = CatalogUtils.getIdFromSet(repository.getRedisSetRepository().getSetMembers(attKey)); 
     if (id == null) { 
      return null; 
     } 
     return get(id, clazz); 
    } 

里面还有一些实用功能,只是帮我建立我想使用的Redis从类名和密钥确保存储在redis集中的ID是单个值。正如你所看到的,在第二个get方法中,我用集合的结果调用第一个get方法。

实现生菜/反应器的第一个方法是很容易:

public <T extends CatalogInfo> Mono<String> getReactive(String id, Class<T> clazz, Publisher<String>... publisher) { 
    Mono<String> mono = Mono.just(id).flatMap(new Function<String, Mono<String>>() { 
     @Override 
     public Mono<String> apply(String id) { 
      return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id); 
     } 
    }); 
    return mono; 
} 

在这一点上,我可以调用mono.block()来获得结果值。我可以相当容易地由两个flatMaps /功能串联起来获得第二个get方法工作的功能:

public <T extends CatalogInfo> Flux<String> getIdFromSetReactive(String attName, String attValue, Class<T> clazz) { 
    String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue); 

    Flux<String> flux = Flux.just(attKey).flatMap(new Function<String, Flux<String>>() { 
     @Override 
     public Flux<String> apply(String attKey) { 
      return commands.smembers(attKey); 
     } 
    }).flatMap(new Function<String, Publisher<String>>() { 
     @Override 
     public Publisher<String> apply(String id) { 
      return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id); 
     } 
    }); 
    return flux; 
} 

我有很多不同的类型,可能需要多达8个呼叫Redis的完成方法。在我最初的代码中,我可以重新使用每种方法并从另一个方法调用一种方法,但我无法弄清楚如何在reactor中执行此操作。

我希望能够调用一个构建Flux来从redis集合中获得ID的方法(我们称之为fluxA),然后调用另一个构建Flux的方法来查询基于ID的redis哈希值( fluxB)等

认为我可能需要定义每个功能,我可能需要作为成员变量是这样的:

private Function<String, Flux<String>> getIdFromSetFunction = new Function<String, Flux<String>>() { 
     @Override 
     public Flux<String> apply(String attKey) { 
      return commands.smembers(attKey); 
     } 
    }; 

,然后拨打电话一样

return Flux.just(attKey).flatMap(getIdFromSetFunction).flatMap(getHash); 

唯一的问题是在这些函数中执行的代码需要当前在我的方法调用中提供的类信息。但我不确定这是否正确。

任何意见将不胜感激!

回答

2

在概念上,您不是“为单个对象创作”,而是“为每个步骤创建一个新对象”。

Mono<String> a = Mono.just("something"); 
Mono<String> b = a.flatMap(s -> goDoSomethingElseThatReturnsAMono(s)); 

String result = b.block(); 

你可以像你想要的那样保持链接。 (或者如果您要分别处理多个数据项,则进入Flux世界,Mono.flatMapMany)。

直到block被调用,因为它订阅了b,并且阻塞当前线程直到结果可用。

+0

你一直在推动我作为新手进入被动世界。谢谢您的回答! – Bal