2017-08-16 150 views
3

我想了解为什么我想用RabbitMQ的Spring云流。我看了一下RabbitMQ Spring教程4(https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html),这基本上是我想要做的。它创建一个直接与2个队列连接的交换机,并根据路由键将消息路由到Q1或Q2。春云流RabbitMQ

整个过程是非常简单的,如果你看教程,你创建所有的部分,绑定在一起,你准备好了。

我想知道使用Sing Cloud Stream会带来什么好处,如果这甚至是它的用例。很容易创建一个简单的交换,甚至定义目标和组是直接与流。所以我想为什么不去更进一步,并尝试用流处理教程案例。

我已经看到Stream有一个BinderAwareChannelResolver这似乎是做同样的事情。但我正在努力将它们放在一起,以达到与RabbitMQ Spring教程中相同的效果。我不知道这是否是一个依赖的问题,但我似乎从根本上误解了这里的东西,我想是这样的:

spring.cloud.stream.bindings.output.destination=myDestination 
spring.cloud.stream.bindings.output.group=consumerGroup 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key' 

应的伎俩。

有没有人有一个源和接收器的基本创建直接交换,绑定2个队列的最小示例,并取决于路由关键路由到2个队列之一,如https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html

编辑

下面是一组最少的代码演示了如何做我问。我没有附上build.gradle,因为它是直线前进(但如果有人有兴趣,让我知道)

application.properties:建立生产者

spring.cloud.stream.bindings.output.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type 

Sources.class:建立生产者通道

public interface Sources { 

    String OUTPUT = "output"; 

    @Output(Sources.OUTPUT) 
    MessageChannel output(); 
} 

StatusController.class:响应其余呼叫并发送具有特定路由键的消息

/** 
* Status endpoint for the health-check service. 
*/ 
@RestController 
@EnableBinding(Sources.class) 
public class StatusController { 

    private int index; 

    private int count; 

    private final String[] keys = {"orange", "black", "green"}; 

    private Sources sources; 

    private StatusService status; 

    @Autowired 
    public StatusController(Sources sources, StatusService status) { 
     this.sources = sources; 
     this.status = status; 
    } 

    /** 
    * Service available, service returns "OK"'. 
    * @return The Status of the service. 
    */ 
    @RequestMapping("/status") 
    public String status() { 
     String status = this.status.getStatus(); 

     StringBuilder builder = new StringBuilder("Hello to "); 
     if (++this.index == 3) { 
      this.index = 0; 
     } 
     String key = keys[this.index]; 
     builder.append(key).append(' '); 
     builder.append(Integer.toString(++this.count)); 
     String payload = builder.toString(); 
     log.info(payload); 

     // add kv pair - routingkeyexpression (which matches 'type') will then evaluate 
     // and add the value as routing key 
     Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key)); 
     sources.output().send(msg); 

     // return rest call 
     return status; 
    } 
} 
事情

消费者方面,性能:

spring.cloud.stream.bindings.input.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange 
spring.cloud.stream.bindings.inputer.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black 

Sinks.class

public interface Sinks { 

    String INPUT = "input"; 

    @Input(Sinks.INPUT) 
    SubscribableChannel input(); 

    String INPUTER = "inputer"; 

    @Input(Sinks.INPUTER) 
    SubscribableChannel inputer(); 
} 

ReceiveStatus.class:接收状态:

@EnableBinding(Sinks.class) 
public class ReceiveStatus { 
    @StreamListener(Sinks.INPUT) 
    public void receiveStatusOrange(String msg) { 
     log.info("I received a message. It was orange number: {}", msg); 
    } 

    @StreamListener(Sinks.INPUTER) 
    public void receiveStatusBlack(String msg) { 
     log.info("I received a message. It was black number: {}", msg); 
    } 
} 

回答

3

春云流可以开发事件驱动的微服务应用程序通过使应用程序连接(通过@EnableBinding)到e使用Spring Cloud流绑定器实现的外部消息系统(Kafka,RabbitMQ,JMS绑定器等)。显然,Spring Cloud Stream使用Spring AMQP来实现RabbitMQ binder。

BinderAwareChannelResolver适用于动态绑定支持生产者,我认为你的情况是关于配置交换和消费者绑定到该交换。例如,根据您的标准以及具有您上面提到的属性(routing-key-expression,destination)的单个生产者(群组除外),您需要有2个消费者使用适当的bindingRoutingKey集合。我注意到你已经为出站频道配置了group。该group属性仅适用于消费者(因此入站)。

你可能也想检查一下:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57,因为我看到使用routing-key-expression的一些讨论。具体而言,请使用表达式值检查this

+0

感谢您的回答。我已经看过提到的问题,他们是我真的问这个计算器问题的原因。对于其他人来说似乎很清楚的是,对于我来说它并不是。你对BinderAwareChannelResolver的解释证明我理解我到达了正确的角落:)。但是,我试图设置路由键表达式,但它不起作用。似乎像gradle的依赖问题,但我没有得到它的工作。这就是为什么我要求一个示例项目。 – maiksensi

+0

似乎我得到了消费方工作通过: 'spring.cloud.stream.bindings.input.destination = tut.direct spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType = direct spring.cloud .stream.rabbit.bindings.input.consumer.bindingRoutingKey =橙 spring.cloud.stream.bindings.inputer.destination = tut.direct spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType =直接 弹簧.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey = black' 只剩下动态设置生产者端的路由密钥。 – maiksensi