2011-03-08 48 views
8

我们有一个接收大量消息的JMS队列。有效的JMS处理

监听器必须使用数据库事务将消息保存在数据库中,然后提交JMS事务。

那么我怎么能更有效地做到这一点,我不必做数据库& JMS提交每封邮件。

回答

6

异步消息,尤其是当使用MDB时,背后的前提是每个消息是原子的。也就是说,处理任何一条消息的结果应该与处理任何其他消息的结果无关。您的问题的理想解决方案将保留这种消息的原子性。

如果您要在同一个工作单元中处理多个邮件,那么您将失去这种原子性。例如,假设您决定每25个消息同步一次。如果第25条消息出现错误,例如阻止从队列中检索到的代码页转换问题,则整批消息都将被退出。然后他们都会被重新递交。消息的重新传送计数会随着每个读取/撤销周期而增加。一旦重新传送计数超过了您的应用服务器中设置的阈值,所有25条消息都将被丢弃或重新发送,具体取决于您的配置。批次越大,在错误情况下可能影响的消息就越多,因为整个批次将一起存活或死亡。如果发生单一毒药信息,请将批量大小设置为100,并且100封邮件将面临风险。

另一种解决方案是允许MDB中有许多处理线程。使用JMS,您可以在同一连接下产生多个会话。每个会话都可以管理自己的工作单元,因此每个会话都可以独立启动XA事务,获取消息,更新数据库,然后提交事务。如果一条消息不好,则只有该消息和数据库更新会受到影响。

也有例外。例如,如果处理大批量并且所有消息都来自同一个生产者,则通常使用除MDB以外的其他东西来获取许多消息并在同一工作单元下更新许多行。同样,如果消息依赖于序列,那么并行处理是不可能的,因为它不会保留序列。但是再次,依赖序列的消息不是原子的。再次,在这种情况下,MDB并不是理想的解决方案。

根据您的传输提供者,支持的线程数量可能仅受内存存储的限制。例如,WebSphere MQ可以轻松处理队列中数百个同时存在的getter线程。检查您的应用服务器的MDB配置的调整,以查看您可以旋转多少个线程,然后验证您的传输是否可以处理负载。然后玩一下找到最佳的线程数。随着线程从一个线程增加,性能将显着增加,但只能达到一个点。过去这一点你通常会看到一个高原,然后随着线程管理开销的下降而抵消性能增益。 swe3et所在的位置取决于消息传递代理的加载程度以及它是否受CPU,内存,磁盘或网络的限制。

8

不要在每条消息上做,而是分批进行。 JMS像数据库一样支持事务;启动一个JMS事务,读取N个消息。启动数据库事务,插入N条消息。提交给JMS,提交给DB。

这显然引入了一个竞赛发生的窗口(两次提交之间发生崩溃)。你现在有这个,但只能用于一条消息。如果您想要解决这个问题,您需要查看XA事务(两个分阶段提交),或者至少需要某种重复的检测方案。对于一些介绍,看看:http://activemq.apache.org/should-i-use-xa.html

+0

回调方法“onMessage”一次只返回一条消息,所以我如何获取N条消息。 – changed 2011-03-08 23:26:22

+3

我不会使用MessageListener接口,只会在收到消息时做些事情。你可以这样做(记录你通过成员变量接收了多少条消息,启动和提交事务等),但是你正在扩展你的竞争条件窗口,因为你依赖一条消息来触发任何操作。这真的不是最好的方法。在传统的循环中,你从队列中读取消息(阻塞超时或非阻塞呼叫)并且当你有N条消息或Y时间已经过去时进行提交。 – 2011-03-08 23:40:05

+1

对不起 - 特别是使用MessageConsumer接口方法receive(timeout)和receiveNoWait(),而不是使用setMessageListener()注册MessageListener。 – 2011-03-08 23:52:45

0

这里是一个jms处理器,它将从一个队列中取出消息,将它们添加到列表中,并推回到另一个队列。您可以调整的值被读取并聚集在各自的方法:

public class JmsBatcher<T> { 
    final Session session; 
    private final MessageConsumer consumer; 
    private final MessageProducer producer; 
    private final int batchSize; 


    public JmsBatcher(final Connection connection, 
         final String sourceQueue, 
         final String destQueue, 
         final int batchSize) throws JMSException { 
     this.batchSize = batchSize; 
     session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
     final Queue source = session.createQueue(sourceQueue); 
     final Queue dest = session.createQueue(destQueue); 
     consumer = session.createConsumer(source); 
     producer = session.createProducer(dest); 
    } 

    public void processBatch() { 
     final List<T> values = new ArrayList<>(); 
     try { 
      while (values.size() < batchSize) { 
       final Message message = consumer.receive(); 
       values.add(readObject(message)); 
       message.acknowledge(); 
      } 
      producer.send(createAggregate(values)); 
      session.commit(); 
     } catch (Exception e) { 
      // Log the exception 
      try { 
       session.rollback(); 
      } catch (JMSException re) { 
       // Rollback failed, so something fataly wrong. 
       throw new IllegalStateException(re); 
      } 
     } 
    } 

    private Message createAggregate(final List<T> values) throws JMSException { 
     return session.createObjectMessage((Serializable) values); 
    } 

    private T readObject(final Message message) throws JMSException { 
     return (T) ((ObjectMessage) message).getObject(); 
    } 
} 

这可以在一个单独的线程启动了,只是一直运行下去:

final JmsBatcher jmsBatcher = 
    new JmsBatcher(connection, "single", "batch", 25); 
new Thread(() -> { 
    while (true) { 
     jmsBatcher.processBatch(); 
    } 
}).start(); 

然后,您可以提交给数据库从成批的结果中批量分批。如果有任何失败,交易将被重试。