有效的JMS处理
回答
异步消息,尤其是当使用MDB时,背后的前提是每个消息是原子的。也就是说,处理任何一条消息的结果应该与处理任何其他消息的结果无关。您的问题的理想解决方案将保留这种消息的原子性。
如果您要在同一个工作单元中处理多个邮件,那么您将失去这种原子性。例如,假设您决定每25个消息同步一次。如果第25条消息出现错误,例如阻止从队列中检索到的代码页转换问题,则整批消息都将被退出。然后他们都会被重新递交。消息的重新传送计数会随着每个读取/撤销周期而增加。一旦重新传送计数超过了您的应用服务器中设置的阈值,所有25条消息都将被丢弃或重新发送,具体取决于您的配置。批次越大,在错误情况下可能影响的消息就越多,因为整个批次将一起存活或死亡。如果发生单一毒药信息,请将批量大小设置为100,并且100封邮件将面临风险。
另一种解决方案是允许MDB中有许多处理线程。使用JMS,您可以在同一连接下产生多个会话。每个会话都可以管理自己的工作单元,因此每个会话都可以独立启动XA事务,获取消息,更新数据库,然后提交事务。如果一条消息不好,则只有该消息和数据库更新会受到影响。
也有例外。例如,如果处理大批量并且所有消息都来自同一个生产者,则通常使用除MDB以外的其他东西来获取许多消息并在同一工作单元下更新许多行。同样,如果消息依赖于序列,那么并行处理是不可能的,因为它不会保留序列。但是再次,依赖序列的消息不是原子的。再次,在这种情况下,MDB并不是理想的解决方案。
根据您的传输提供者,支持的线程数量可能仅受内存存储的限制。例如,WebSphere MQ可以轻松处理队列中数百个同时存在的getter线程。检查您的应用服务器的MDB配置的调整,以查看您可以旋转多少个线程,然后验证您的传输是否可以处理负载。然后玩一下找到最佳的线程数。随着线程从一个线程增加,性能将显着增加,但只能达到一个点。过去这一点你通常会看到一个高原,然后随着线程管理开销的下降而抵消性能增益。 swe3et所在的位置取决于消息传递代理的加载程度以及它是否受CPU,内存,磁盘或网络的限制。
不要在每条消息上做,而是分批进行。 JMS像数据库一样支持事务;启动一个JMS事务,读取N个消息。启动数据库事务,插入N条消息。提交给JMS,提交给DB。
这显然引入了一个竞赛发生的窗口(两次提交之间发生崩溃)。你现在有这个,但只能用于一条消息。如果您想要解决这个问题,您需要查看XA事务(两个分阶段提交),或者至少需要某种重复的检测方案。对于一些介绍,看看:http://activemq.apache.org/should-i-use-xa.html
这里是一个jms处理器,它将从一个队列中取出消息,将它们添加到列表中,并推回到另一个队列。您可以调整的值被读取并聚集在各自的方法:
public class JmsBatcher<T> {
final Session session;
private final MessageConsumer consumer;
private final MessageProducer producer;
private final int batchSize;
public JmsBatcher(final Connection connection,
final String sourceQueue,
final String destQueue,
final int batchSize) throws JMSException {
this.batchSize = batchSize;
session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
final Queue source = session.createQueue(sourceQueue);
final Queue dest = session.createQueue(destQueue);
consumer = session.createConsumer(source);
producer = session.createProducer(dest);
}
public void processBatch() {
final List<T> values = new ArrayList<>();
try {
while (values.size() < batchSize) {
final Message message = consumer.receive();
values.add(readObject(message));
message.acknowledge();
}
producer.send(createAggregate(values));
session.commit();
} catch (Exception e) {
// Log the exception
try {
session.rollback();
} catch (JMSException re) {
// Rollback failed, so something fataly wrong.
throw new IllegalStateException(re);
}
}
}
private Message createAggregate(final List<T> values) throws JMSException {
return session.createObjectMessage((Serializable) values);
}
private T readObject(final Message message) throws JMSException {
return (T) ((ObjectMessage) message).getObject();
}
}
这可以在一个单独的线程启动了,只是一直运行下去:
final JmsBatcher jmsBatcher =
new JmsBatcher(connection, "single", "batch", 25);
new Thread(() -> {
while (true) {
jmsBatcher.processBatch();
}
}).start();
然后,您可以提交给数据库从成批的结果中批量分批。如果有任何失败,交易将被重试。
- 1. JBoss不处理JMS消息
- 2. Symfony3.1没有处理无效
- 3. 如何处理java.net.ConnectException有效
- 4. 有效地处理设置
- 5. 跟踪处理的jms消息
- 6. Java/JMS-处理处理失败场景的消息数
- 7. jms错误处理程序:只有当jms放弃时才会收到回调
- 8. JMS - 异步处理 - 处理父/子进程依赖关系
- 9. ActiveMQ/JMS消息处理程序测试
- 10. JMS with Spring Integration或Spring批处理
- 11. JMS与JPPF(Java并行处理)框架
- 12. OSB - JMS - 错误处理程序
- 13. 有效的键盘输入处理
- 14. 内存有效的XSLT处理器
- 15. 处理父/子的最有效方法
- 16. 查询处理有效的数值
- 17. 如何有效地处理WCF服务与错误处理
- 18. QT框架处理gzip有效载荷
- 19. 如何有效地处理PHP会话?
- 20. 批处理脚本不再有效?
- 21. 如何更有效地处理错误?
- 22. 如何有效处理用户角色?
- 23. 有效处理服务器负载
- 24. 有效处理推送通知
- 25. Hadoop或Postgresql进行有效处理
- 26. “事件处理”的效率
- 27. OpenProcess处理无效的ReadProcessMemory
- 28. 在触发器处理之前记录JMS队列中的所有xmls
- 29. 是否有更有效的方法来处理重复功能?
- 30. 是否有多个onmouseup事件处理程序有效的html?
回调方法“onMessage”一次只返回一条消息,所以我如何获取N条消息。 – changed 2011-03-08 23:26:22
我不会使用MessageListener接口,只会在收到消息时做些事情。你可以这样做(记录你通过成员变量接收了多少条消息,启动和提交事务等),但是你正在扩展你的竞争条件窗口,因为你依赖一条消息来触发任何操作。这真的不是最好的方法。在传统的循环中,你从队列中读取消息(阻塞超时或非阻塞呼叫)并且当你有N条消息或Y时间已经过去时进行提交。 – 2011-03-08 23:40:05
对不起 - 特别是使用MessageConsumer接口方法receive(timeout)和receiveNoWait(),而不是使用setMessageListener()注册MessageListener。 – 2011-03-08 23:52:45