2013-04-05 70 views
0

我构建了一个基于JBoss 7.1.1和外部HornetQ实现2.2.1.4构建的Spring JMS解决方案。这连接并成功地工作。JBoss EAP6 + HornetQ - 不确定如何创建到HornetQ的队列连接

但是我现在使用EAP6,并试图连接到EAP6内部打包的内部HornetQ。

我有几个类来管理连接和创建队列。但是,这似乎并不需要连接到打包的HornetQ - 可以很好地连接到外部的HornetQ。

我已经提出这与红帽,他们不知道如何解决,因为这也需要春季编码。

我的问题是,我认为,我需要创建一个QueueConnection,如QueueConnection qcon = queueConnectionFactory.createQueueConnection("user","password");

但我们在春季实施的方式,是我们使用Spring JmsTemplate的,并增加了队列的概念连接到此不可用,所以它不工作。

下面是包含所需的Spring bean的JMS-的services.xml文件:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:task="http://www.springframework.org/schema/task" xmlns:tx="http://www.springframework.org/schema/tx" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
         http://www.springframework.org/schema/beans/spring-beans.xsd 
         http://www.springframework.org/schema/context 
         http://www.springframework.org/schema/context/spring-context.xsd 
         http://www.springframework.org/schema/task 
         http://www.springframework.org/schema/task/spring-task.xsd 
         http://www.springframework.org/schema/tx 
         http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> 

    <context:annotation-config /> 

    <context:component-scan base-package="com.myproject.test" /> 

    <task:annotation-driven /> 

    <bean id="testTransactionManager" 
     class="org.springframework.transaction.jta.JtaTransactionManager"> 
     <property name="transactionManagerName" value="java:/TransactionManager"></property> 
     <property name="autodetectUserTransaction" value="false"></property> 
    </bean> 

    <tx:annotation-driven transaction-manager="testTransactionManager" /> 

    <bean id="queueConnectionFactory" class="com.myproject.test.impl.QueueConnectionFactoryImpl"> 
     <constructor-arg type="String" name="host" value="231.7.7.7" /> 
     <constructor-arg type="int" name="port" value="9876" /> 
     <constructor-arg type="boolean" name="useJta" value="true" /> 
     <constructor-arg type="boolean" name="useCluster" value="true" /> 
    </bean> 

    <bean id="testQueueManager" class="com.myproject.test.impl.QueueManagerImpl"> 
     <constructor-arg ref="queueConnectionFactory" /> 
     <constructor-arg name="queue" value="TestQueue" /> 
    </bean> 

</beans> 

这是我QueueConnectionFactoryImpl类:

package com.myproject.test.impl; 

import java.util.HashMap; 
import java.util.Map; 

import javax.jms.ConnectionFactory; 
import javax.jms.JMSException; 

import org.hornetq.api.core.TransportConfiguration; 
import org.hornetq.api.jms.HornetQJMSClient; 
import org.hornetq.api.jms.JMSFactoryType; 
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; 
import org.hornetq.core.remoting.impl.netty.TransportConstants; 
import org.jboss.logging.Logger; 

import com.myproject.test.QueueConnectionFactory; 

public class QueueConnectionFactoryImpl implements QueueConnectionFactory { 

    private String host; 
    private int port; 
    private ConnectionFactory connectionFactory; 
    private Logger logger; 
    private boolean useJta = false; 

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta) 
    { 
     this.useJta = useJta; 
     createConnection(host, port); 
    } 

    public QueueConnectionFactoryImpl(String host, int port) { 

     createConnection(host, port); 
    } 

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta, boolean useCluster) 
    { 
     this.useJta = useJta; 
     if(useCluster) 
      createClusterConnection(host, port); 
     else 
      createConnection(host, port); 
    } 

    private void createConnection(String host, int port) { 

     logger = Logger.getLogger(this.getClass()); 

     this.host = host; 
     this.port = port; 

     Map<String, Object> connectionParams = new HashMap<String, Object>(); 
     connectionParams.put(TransportConstants.PORT_PROP_NAME, port); 
     connectionParams.put(TransportConstants.HOST_PROP_NAME, host); 

     TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams); 

     JMSFactoryType jmsFType = JMSFactoryType.CF; 

     if(useJta) 
      jmsFType = JMSFactoryType.XA_CF; 

     connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration); 

    } 

    private void createClusterConnection(String host, int port) 
    { 
     logger = Logger.getLogger(this.getClass()); 

     this.host = host; 
     this.port = port; 

     JMSFactoryType jmsFType = JMSFactoryType.CF; 

     if(useJta) 
     jmsFType = JMSFactoryType.XA_CF; 

     connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithHA(new DiscoveryGroupConfiguration(host, port), jmsFType); 

    } 

    public QueueConnectionFactoryImpl(Object connectionFactory) 
    { 
     logger = Logger.getLogger(this.getClass()); 
     logger.debug("Object is: "+connectionFactory); 
    } 

    public String getHost() { 
     return host; 
    } 

    public void setHost(String host) { 
     this.host = host; 
    } 

    public int getPort() { 
     return port; 
    } 

    public void setPort(int port) { 
     this.port = port; 
    } 

    public ConnectionFactory getConnectionFactory() { 
     return connectionFactory; 
    } 

    public void setConnectionFactory(ConnectionFactory connectionFactory) { 
     this.connectionFactory = connectionFactory; 
    } 

    public boolean isUseJta() { 
     return useJta; 
    } 

    public void setUseJta(boolean useJta) { 
     this.useJta = useJta; 
    } 

} 

这是我QueueManagerImpl代码

package com.myproject.test.impl; 

import javax.jms.ConnectionFactory; 
import javax.jms.DeliveryMode; 
import javax.jms.Queue; 

import org.apache.log4j.Logger; 
import org.hornetq.api.jms.HornetQJMSClient; 
import org.springframework.jms.core.JmsTemplate; 

import com.myproject.test.QueueManager; 

public class QueueManagerImpl implements QueueManager { 

    private String queue; 
    private ConnectionFactory connectionFactory; 
    private JmsTemplate template; 
    private Queue jmsQueue; 
    private boolean useJta = false; 
    private static final Logger log = Logger.getLogger(QueueManagerImpl.class); 

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory) { 

     template = new JmsTemplate(); 
     connectionFactory = queueConnectionFactory.getConnectionFactory(); 
     try 
     { 
      this.setUseJta(queueConnectionFactory.isUseJta()); 
      template.setConnectionFactory(connectionFactory); 
      template.setExplicitQosEnabled(true); 
      template.setDeliveryMode(DeliveryMode.PERSISTENT); 
      if(queueConnectionFactory.isUseJta()) 
       template.setSessionTransacted(true); 
     } 
     catch(Exception ex) 
     { 
      logError(ex.toString()); 
     } 
    } 

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory, String queue) { 

     this(queueConnectionFactory); 
     setQueue(queue); 
    } 

    public String getQueue() { 

     return queue; 
    } 

    public void setQueue(String queue) { 

     try 
     { 
      jmsQueue = HornetQJMSClient.createQueue(queue); 
      template.setDefaultDestination(jmsQueue); 
      this.queue = queue; 
     } 
     catch(Exception ex) 
     { 
      logError(ex.toString()); 
     } 
    } 

    public JmsTemplate getTemplate() { 
     return template; 
    } 

    public void logError(String error) 
    { 
     String details = String.format("Unable to connect to queue, details: %s ", error); 
     String errorMessage = String.format("error...", details); 
     log.error(errorMessage); 
    } 

    @Override 
    public boolean isUseJta() { 
     return useJta; 
    } 

    @Override 
    public void setUseJta(boolean useJta) { 
     this.useJta = useJta; 
    } 
} 

最主要的是,上面的代码需要ConenctionFactory对象被传递到JmsTemplate的在QueueManagerImpl - template.setConnectionFactory(connectionFactory的);.

我已经尝试了多种方法可以得到这个工作:

1)添加以下的JSM-service.xml中文件:

<bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter"> 
    <property name="targetConnectionFactory" ref="queueConnectionFactory"/> 
    <property name="username" value="myuser"/> 
    <property name="password" value="myuser123"/> 
</bean>  

这就产生以下异常:

java.lang.IllegalStateException:不能类型的值转换[com.myproject.test.impl.QueueConnectionFactoryImpl]至所需的类型[javax.jms.ConnectionFactory]财产“targetConnectionFactory”:没有匹配编辑或转换战略发现

2)改变在QueueConnectionFactoryImpl到连接:

org.hornetq.jms.client.HornetQConnectionFactory HQConnectionFactory = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration); 

try { 
    connectionFactory = (ConnectionFactory) HQConnectionFactory.createConnection("myuser","myuser123"); 
} catch (JMSException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 

这也不起作用。我得到一个例外:

java.lang.ClassCastException:org.hornetq.jms.client.HornetQConnection不能转换到的javax.jms。连接工厂


总之,任何人都可以请让我上面的代码以某种方式提供用户名和密码,这样我仍然可以使用JmsTemplate的连接到HornetQ的的方法帮助。

回答

1

只需使用您自己的连接工厂。它的工作对我来说:

春豆(5445是HornetQ的受体端口):

<bean name="jmsConnectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory"> 
    <constructor-arg name="ha" value="false" /> <!-- set true if you want support failover --> 
    <constructor-arg name="commaSepratedServerUrls" value="127.0.0.1:5445" /> 
    <property name="username" value="admin" /> 
    <property name="password" value="admin" /> 
</bean> 

连接工厂实现(从HornetQ中的JMS客户端和TransportConfiguration从HornetQ的核心客户端使用HornetQJMSConnectionFactory):现在

public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory 
{ 
    private static final long serialVersionUID = 1L; 

    private String username; 
    private String password; 

    public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls) 
    { 
     super(ha, converToTransportConfigurations(commaSepratedServerUrls)); 
    } 

    public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls) 
    { 
     String [] serverUrls = commaSepratedServerUrls.split(","); 
     TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length]; 
     for(int i = 0; i < serverUrls.length; i++) 
     { 
      String[] urlParts = serverUrls[i].split(":"); 
      HashMap<String, Object> map = new HashMap<String,Object>(); 
      map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]); 
      map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]); 
      transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 
     } 
     return transportconfigurations; 
    } 

    @Override 
    public Connection createConnection() throws JMSException 
    { 
     return super.createConnection(username, password); 
    } 
    public String getUsername() { 
     return username; 
    } 
    public void setUsername(String username) { 
     this.username = username; 
    } 
    public String getPassword() { 
     return password; 
    } 
    public void setPassword(String password) { 
     this.password = password; 
    } 
} 

,如果你把这个连接工厂JmsTemplate时,你可以使用用户名/密码发送/消费消息

相关问题