2016-11-24 43 views
0

我有一个MQ侦听器,它侦听消息并将状态更新为数据库。我有一个设置,其中Hibernate会话由Spring管理。执行Hibernate更新时的不一致性

以下是MQ监听器配置。

 <bean id="queue" class="com.ibm.mq.jms.MQQueue"> 
      <constructor-arg value="queuename" /> 
     </bean> 
     <bean id="listenerBean" class="com.mypackage.Listener"> 
       <property name="service" ref="myService" /> 
     </bean> 
     <bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> 
       <constructor-arg><ref bean="listenerBean"/></constructor-arg> 
     </bean> 
     <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
       <property name="connectionFactory" ref="connectionFactory" /> 
       <property name="destination" ref="queue" /> 
       <property name="messageListener" ref="listener" /> 
       <property name="exceptionListener" ref="exceptionListener" /> 
       <property name="sessionTransacted" value="true" /> 
       <property name="autoStartup" value="true" /> 
     </bean> 

在支持Java端来处理MQ消息,

public class Listener implements MessageDelegate{ 

     public MyService service; 

     @Override 
     public void handleMessage(Serializable message) { 
       service.process(message); 
     } 
} 

在服务类的工艺方法调用DAO方法来更新数据库。

以下是用于Spring sessionFactory Bean org.springframework.orm.hibernate3.LocalSessionFactoryBean的hibernate属性。

<property name="hibernateProperties"> 
      <props> 
        <prop key="hibernate.dialect">org.hibernate.dialect.Oracle10gDialect</prop> 
        <prop key="hibernate.cache.provider_class">org.hibernate.cache.EhCacheProvider</prop> 
        <prop key="hibernate.cache.use_second_level_cache">true</prop> 
        <prop key="hibernate.cache.use_query_cache">true</prop> 
        <prop key="hibernate.show_sql">false</prop> 
        <prop key="hibernate.generate_statistics">true</prop> 
        <prop key="hibernate.cache.provider_configuration_file_resource_path">ehcache_db_custom.xml</prop> 
      </props> 
    </property> 

该SessionFactory注入我的DAO豆,我得到使用sessionFactory.getCurrentSession()

使用这个session我使用标准API来获取豆基于独特的价值,我从MQ消息,并为得到我的会话我获得的独特结果是,我将状态更改为成功,然后再次呼叫sessionFactory.getCurrentSession()获得session,并致电session.update()更新数据库。

因为我使用hbm文件,所以bean上没有注释。它是一个简单的bean,没有一对多或多对一的映射。没有@Transactional注释。

下面是更新片段。我相信Spring会处理交易。

Session session = sessionFactory.getCurrentSession(); 
    if(session != null && bean != null) { 
     session.update(bean); 
    } 

我有DB成功更新和异常处理每当DB更新失败记录器(由我添加的是,log4j)。

这里是怪异的部分。

我碰到过多线程环境中的情况,其中日志不显示由休眠和日志引发的任何异常show DB已成功更新,但状态未更新。这只发生在一些记录中,并非全部。有记录被成功更新。我无法找到任何数据特定的问题。

当我执行一个使用JUnit无法更新的记录时,它成功更新。

有人可以让我知道,如果我错过配置结束了什么吗?

+0

您可以添加MQ监听器代码并告诉我们配置了多少监听器? – developer

+0

@javaguy在问题中更新了MQ侦听器代码。 8个JVM上共有8个监听器,分布在2台服务器上。 – Prabhat

回答

0

在您的应用程序中存在一个设计问题,即基本上,在多线程JMS应用程序中,当消费者正在使用多个线程进行侦听时,消息将失去订单(生产者已生产),您将发现不正确如果您没有仔细设计应用程序,那么数据库中的更新(没有任何例外,我们在应用程序的某个时间面临同样的问题)。

所以要解决这个问题,一种方法是,你需要从数据库端有一个业务逻辑来检查要更新的记录是否真的是正确的(你需要在一个事务中处理这个锁定)。例如,如果您的应用程序在products数据库表中(按递增顺序)持有number_of_products,那么您需要检查是否只有当新消息具有更高价值的产品时才应该执行新更新。

另一个解决方案是创建一个入口层,如适配器(应该是单线程的),并使用某些业务逻辑生成序列号(如每个product_type都会有一个序列),并在数据库中检查该序列号更新。如果序列号低于先前的值,则应忽略该消息,因为它不是最新的消息,即最新消息已更新到数据库。

+0

基本上,从MQ消息收到的唯一标识是过程的一部分。此记录插入到1或2天前的DB中,状态为'New',所以它肯定应该存在于DB中。我不介意MQ消息被消费的顺序,只要相应记录的状态更新为'已完成' – Prabhat

+0

除了新建和完成之外,还有哪些其他状态? – developer

+0

有'进行中',但这是特定于应用程序。基本上,在更改为“新建”之后,会有一些过程,之后它会更新为“进行中”,然后会有延迟。然后应用程序从MQ获得一条消息,将状态更新为'Completed'。即使已收到来自MQ的消息,我也会看到一些具有“进行中”状态的记录。 – Prabhat