2016-11-18 115 views
1

我试图创建2个简单的应用程序;一个是将消息发布到RabbitMQ频道,另一个是从频道接收它并将其打印出控制台。发件人应用程序立即启动并发布10条消息。春季集成AMQP和RabbitMQ丢失和“unacked”消息

我在客户端控制台上看到的只有大约一半的消息打印出来。 当我检查RabbitMQ Web客户端时,我还会看到其中一条消息始终处于“未定义”状态。

当我阅读文档时,据我了解“amqp入站/出站网关”是实现此目的的一种简单方法。 你能帮我理解为什么我丢失了一些消息,并且一个人坐在“未吃”状态? 另外,我应该如何改变它以获得另一方的所有消息?

预先感谢您。

下面是XML配置和文件上发送侧:

integrationContext.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp 
    http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/rabbit 
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd"> 

<!-- Configuration for Component Scan --> 
<context:component-scan base-package="com.amqp.sample" /> 

<context:property-placeholder location="classpath*:rabbitmq.properties"/> 

<int:gateway id="taskGateway" service-interface="com.amqp.sample.TaskGateway" default-request-channel="processChannel" /> 
<int-amqp:channel id="processChannel" 
    connection-factory="connectionFactory" 
    message-driven="true" 
    queue-name="ha.rabbit.channel" /> 

<!-- RabbitMQ Connection Factory --> 
<rabbit:connection-factory id="connectionFactory" 
    addresses="${rabbitmq.addresses}" 
    username="${rabbitmq.username}" 
    password="${rabbitmq.password}" /> 

<rabbit:template id="amqpTemplate" 
    connection-factory="connectionFactory" 
    reply-timeout="-1" /> 

<rabbit:admin connection-factory="connectionFactory" /> 


<int-amqp:outbound-gateway request-channel="processChannel" 
          reply-channel="processChannel" 
          reply-timeout="-1" /> 

</beans> 

TaskGateway.java

import org.springframework.messaging.Message; 

import com.amqp.sample.model.Task; 

public interface TaskGateway { 

    void processTaskRequest(Message<Task> message); 
} 

Task.java

import java.io.Serializable; 

public class Task implements Serializable { 

    private static final long serialVersionUID = -2138235868650860555L; 
    private int id; 
    private String name; 

    public int getId() { 
     return id; 
    } 

    public void setId(int id) { 
     this.id = id; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public Task(int id, String name) { 
     this.id = id; 
     this.name = name; 
    } 

    @Override 
    public String toString() { 
     return "Task [id=" + id + ", name=" + name + "]"; 
    } 

} 

Application.Java

@PropertySources({ 
@PropertySource("classpath:application.properties"), 
}) 
@EnableConfigurationProperties 
@ComponentScan 
@EnableAutoConfiguration 
@ImportResource("classpath:integrationContext.xml") 
public class Application extends SpringBootServletInitializer { 
    public static final Logger logger = LoggerFactory.getLogger(Application.class); 

    private static TaskGateway taskGateway; 

    public static void main(String[] args) { 
    ApplicationContext context=SpringApplication.run(Application.class, args); 

    taskGateway = context.getBean(TaskGateway.class); 
    for(int i=0; i<10; i++){ 
     Message<Task> message = MessageBuilder.withPayload(getTask(i)).build(); 
     taskGateway.processTaskRequest(message); 
    } 
    } 

    /** 
    * Creates a sample task returns. 
    * 
    * @return Task 
    */ 
    private static Task getTask(final int id) { 
     return new Task(id, "Task with ID:" + id); 
    } 

    @Override 
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { 
    return application.sources(Application.class); 
    } 

} 

而且,这里有对接收方的文件:

integrationContext.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp 
    http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/rabbit 
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd"> 

    <!-- Configuration for Component Scan --> 
    <context:component-scan base-package="com.amqp.sample" /> 

    <context:property-placeholder location="classpath*:rabbitmq.properties"/> 

    <!-- RabbitMQ Connection Factory --> 
    <rabbit:connection-factory id="connectionFactory" 
     addresses="${rabbitmq.addresses}" 
     username="${rabbitmq.username}" 
     password="${rabbitmq.password}" /> 

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> 
    <rabbit:admin connection-factory="connectionFactory" /> 

    <int:channel id="inputChannel"/> 

    <int-amqp:inbound-gateway request-channel="inputChannel" reply-channel="inputChannel" 
    queue-names="ha.rabbit.channel" 
    connection-factory="connectionFactory" 
    amqp-template="amqpTemplate"/> 

    <int:service-activator input-channel="inputChannel" ref="taskProcessService" method="process" /> 

</beans> 

ProcessService.java

import org.springframework.messaging.Message; 

public interface ProcessService<T> { 

    /** 
    * Processes incoming message(s) 
    * 
    * @param message SI Message. 
    */ 
    void process(Message<T> message); 

} 

TaskProcessService

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.messaging.Message; 
import org.springframework.stereotype.Component; 

import com.amqp.sample.model.Task; 

@Component("taskProcessService") 
public class TaskProcessService implements ProcessService<Task> { 

    private final Logger logger = LoggerFactory.getLogger(TaskProcessService.class); 

    @Override 
    public void process(Message<Task> message) { 

    logger.info("Received Message : " + message.getPayload()); 
    } 

} 

Application.java

@PropertySources({ 
@PropertySource("classpath:application.properties"), 
}) 
@EnableConfigurationProperties 
@ComponentScan 
@EnableAutoConfiguration 
@ImportResource("classpath:integrationContext.xml") 
public class Application extends SpringBootServletInitializer { 
    public static final Logger logger = LoggerFactory.getLogger(Application.class); 


    public static void main(String[] args) { 
    ApplicationContext context = SpringApplication.run(Application.class, args); 
    } 

    @Override 
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { 
    return application.sources(Application.class); 
    } 

} 

回答

0

首先,网关用于请求/回复场景;由于您的客户预计没有响应,并且该服务没有返回,您应该使用通道适配器而不是网关。尝试一下,如果你仍然遇到麻烦,请回来。

编辑

@SpringBootApplication 
@IntegrationComponentScan 
public class So40680673Application { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = SpringApplication.run(So40680673Application.class, args); 
     FooGate gate = context.getBean(FooGate.class); 
     for (int i = 0; i < 10; i++) { 
      System.out.println(gate.exchange("foo" + i)); 
     } 
     context.close(); 
    } 

    @MessagingGateway(defaultRequestChannel = "out.input") 
    public interface FooGate { 

     String exchange(String out); 
    } 

    @Bean 
    public IntegrationFlow out(AmqpTemplate amqpTemplate) { 
     return f -> f.handle(Amqp.outboundGateway(amqpTemplate).routingKey(queue().getName())); 
    } 

    @Bean 
    public IntegrationFlow in(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue().getName())) 
       .<String, String>transform(String::toUpperCase) 
       .get(); 
    } 

    @Bean 
    public Queue queue() { 
     return new AnonymousQueue(); 
    } 

} 
+0

谢谢你的回复,加里。实际上,我打算稍后记录发送方收到的消息。这是我选择网关的原因之一。你能帮我解决网关的代码吗? – turgos

+0

客户端的回复超时时间为-1('RabbitTemplate');你的调用线程将永远等待一个永远不会到来的答复。一旦你解决了这个问题,我建议你打开DEBUG日志记录,并遵循客户端和服务器中的消息。我不准备帮助调试伪造配置;抱歉。 –

+0

我明白了。我将更改回复超时并从那里开始调试。同时,你是否有任何简单的集成-amqp与rabbitmq示例代码/配置使用网关,我可以用它作为示例?再次感谢你的帮助。 – turgos