2014-10-02 103 views
1

我有一个通过RabbitMQ发送AMQP消息的应用程序。消息发送是通过http请求触发的。最近我注意到一些消息似乎正在迷失(如从未交付)。我还注意到服务器管理的频道列表正在稳步增加。我已经纠正的第一件事是在不再需要频道后关闭频道。不过,我仍然不确定我的代码的结构是否正确,以确保交付。以下两段代码;第一个是管理连接的单例的一部分(不会在每次调用时重新创建),第二个是发送代码。任何意见/指导将不胜感激。RabbitMQ的可靠消息传递

@Service 
public class PersistentConnection { 
    private static Connection myConnection = null; 
    private Boolean blocked = false; 

    @Autowired ApplicationConfiguration applicationConfiguration; 
    @Autowired ConfigurationService configurationService; 

    @PostConstruct 
    private void init() { 
    } 

    @PreDestroy 
    private void destroy() { 
     try { 
      myConnection.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    public Connection getConnection() { 
     if (myConnection == null) { 
      start(); 
     } 
     else if (!myConnection.isOpen()) { 
      log.warn("AMQP Connection closed. Attempting to start."); 
      start(); 
     } 
     return myConnection; 
    } 


    private void start() { 
     log.debug("Building AMQP Connection"); 

     ConnectionFactory factory = new ConnectionFactory(); 
     String ipAddress = applicationConfiguration.getAMQPHost(); 
     String password = applicationConfiguration.getAMQPUser(); 
     String user = applicationConfiguration.getAMQPPassword(); 
     String virtualHost = applicationConfiguration.getAMQPVirtualHost(); 
     String port = applicationConfiguration.getAMQPPort(); 

     try { 
      factory.setUsername(user); 
      factory.setPassword(password); 
      factory.setVirtualHost(virtualHost); 
      factory.setPort(Integer.parseInt(port)); 
      factory.setHost(ipAddress); 
      myConnection = factory.newConnection(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 

     myConnection.addBlockedListener(new BlockedListener() { 
      public void handleBlocked(String reason) throws IOException { 
       // Connection is now blocked 
       blocked = true; 
      } 

      public void handleUnblocked() throws IOException { 
       // Connection is now unblocked 
       blocked = false; 
      } 
     }); 
    } 

    public Boolean isBlocked() { 
     return blocked; 
    } 
} 

/* 
* Sends ADT message to AMQP server. 
*/ 
private void send(String routingKey, String message) throws Exception { 
    String exchange = applicationConfiguration.getAMQPExchange(); 
    String exchangeType = applicationConfiguration.getAMQPExchangeType(); 

    Connection connection = myConnection.getConnection(); 
    Channel channel = connection.createChannel(); 
    channel.exchangeDeclare(exchange, exchangeType); 
    channel.basicPublish(exchange, routingKey, null, message.getBytes()); 

    // Close the channel if it is no longer needed in this thread 
    channel.close(); 
} 
+1

getConnection()从更多线程中调用吗?如果是,代码不是线程安全的。 – Gabriele 2014-10-02 08:20:36

+0

一个http请求最终会调用getConnection()调用。所以我想它可以。我想通过把它作为一个单例来解决这个问题。什么是改进代码的最佳方式? – skyman 2014-10-02 09:08:23

回答

2

试试这个代码:

@Service 
public class PersistentConnection { 
    private Connection myConnection = null; 
    private Boolean blocked = false; 

    @Autowired ApplicationConfiguration applicationConfiguration; 
    @Autowired ConfigurationService configurationService; 

    @PostConstruct 
    private void init() { 
     start(); /// In this way you can initthe connection and you are sure it is called only one time. 
    } 

    @PreDestroy 
    private void destroy() { 
     try { 
      myConnection.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    public Connection getConnection() { 
     return myConnection; 
    } 


    private void start() { 
     log.debug("Building AMQP Connection"); 

     ConnectionFactory factory = new ConnectionFactory(); 
     String ipAddress = applicationConfiguration.getAMQPHost(); 
     String password = applicationConfiguration.getAMQPUser(); 
     String user = applicationConfiguration.getAMQPPassword(); 
     String virtualHost = applicationConfiguration.getAMQPVirtualHost(); 
     String port = applicationConfiguration.getAMQPPort(); 

     try { 
      factory.setUsername(user); 
      factory.setPassword(password); 
      factory.setVirtualHost(virtualHost); 
      factory.setPort(Integer.parseInt(port)); 
      factory.setHost(ipAddress); 
      myConnection = factory.newConnection(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 

     myConnection.addBlockedListener(new BlockedListener() { 
      public void handleBlocked(String reason) throws IOException { 
       // Connection is now blocked 
       blocked = true; 
      } 

      public void handleUnblocked() throws IOException { 
       // Connection is now unblocked 
       blocked = false; 
      } 
     }); 
    } 

    public Boolean isBlocked() { 
     return blocked; 
    } 
} 

/* 
* Sends ADT message to AMQP server. 
*/ 
private void send(String routingKey, String message) throws Exception { 
    String exchange = applicationConfiguration.getAMQPExchange(); 
    String exchangeType = applicationConfiguration.getAMQPExchangeType(); 

    Connection connection = myConnection.getConnection(); 
    if (connection!=null){ 
    Channel channel = connection.createChannel(); 
    try{ 
    channel.exchangeDeclare(exchange, exchangeType); 
    channel.basicPublish(exchange, routingKey, null, message.getBytes()); 
    } finally{ 
     // Close the channel if it is no longer needed in this thread 
     channel.close(); 
    } 

}}

这可能是不够的,你的系统启动时有一个的RabbitMQ连接。

如果你是一个懒惰的单身人士,代码只是有点不同。

我建议不使用isOpen()方法,请阅读here

ISOPEN

布尔ISOPEN()确定组件是否是当前打开的。 如果我们正在关闭,将返回false。检查这种方法 应该只用于信息,因为竞争条件 - 状态 可以在通话后改变。而不是仅仅执行并试图抓住 ShutdownSignalException和IOException返回:真当组件 是打开的,否则为false

编辑**

问题1:

什么是你要找的是医管局客户。默认情况下

的RabbitMQ Java客户端不支持此功能,因为3.3.0版本仅支持重新连接,读取this

...允许基于Java的客户端网络 后自动连接失败。如果你想确定你的消息,你必须创建一个强大的客户端,能够抵抗所有失败。

通常您应该考虑失败,例如: 如果发布消息期间发生错误,会发生什么情况?

在你的情况下,你只是失去了消息,你应该手动重新排队消息。

问题2:

我不知道你的代码,但connection == null不应该发生,因为这个过程被称为为第一:

@PostConstruct 
    private void init() { 
     start(); /// In this way you can initthe connection and you are sure it is called only one time. 
    } 

反正你可以提出一个例外,这个问题是: 与我试图发送的消息有什么关系?

请看问题1

我想建议阅读更多有关HA,比如这个:

用rabbitmq创建一个可靠的系统并不复杂,但是你应该知道一些基本的概念。

无论如何..让我知道!

+0

非常感谢你。我将在今晚开始工作,明天加载代码并更新你。 – skyman 2014-10-02 10:09:54

+0

我刚开始看这个,我有几个问题。首先,如果RabbitMQ服务器重新启动会发生什么(例如) - 我假设连接将会失败,并且在服务器启动时不会自动重新连接?有没有办法自动重新连接?另外,我在想发送方法应该可能会抛出一个异常,如果连接== null - 这是否有意义? – skyman 2014-10-02 11:36:29

+1

@skyman我编辑答案 – Gabriele 2014-10-02 12:27:52