2017-08-31 116 views
0

我们有一个java web应用程序,它通过JMS发送(JobsController.java)并接收消息(JMSMessageListener.java)。恒定负载下运行的应用程序24小时,采取堆转储后,我观察内存使用量的不断增加,应用程序不放手,当处于空闲状态。我知道这会导致java堆内存不足的问题。Wildfly 10.1.0.FINAL上的内存泄漏(java.lang.ref.Finalizer/ActiveMQConnection)

JobsController是一个EJB无状态Bean和它的资源每次通话后正确销毁。 JMSMessageListener得到由EJB全球Bean池处理,它的实例被重用。

我可以从Java堆转储看到的嫌疑人

  1. EJB豆注射引起内存泄漏 https://blog.akquinet.de/2017/01/04/dont-get-trapped-into-a-memory-leak-using-cdi-instance-injection/
  2. ActiveMQConnection.finalize()。如果它比它必须 发生在所有这些wildfly的ActiveMQ部署的罪魁祸首。任何暗示是 赞赏。

ActiveMQConnection.java

@Override 
protected final void finalize() throws Throwable { 
    if (!closed) { 
     if (this.factoryReference.isFinalizeChecks()) { 
      ActiveMQJMSClientLogger.LOGGER.connectionLeftOpen(creationStack); 
     } 
     close(); 
} 

enter image description here enter image description here enter image description here

JobsController

@Stateless
公共类JobsController {

@Inject 
private JMSContext jmsContext; 
private Connection connection; 
private Session session; 
private MessageProducer jmsProducer; 

@Resource(lookup = "java:/ConnectionFactory") 
private ConnectionFactory connectionFactory; 

@Resource(lookup = JAVA_JMS_JOB_QUEUE) 
private Queue jobQueue; 

@Resource(lookup = JAVA_JMS_QUEUE) 
private Queue progressQueue; 

@PreDestroy 
void release() { 
    try { 
     if (jmsProducer != null) { 
      jmsProducer.close(); 
     } 
     if (session != null) { 
      session.close(); 
     } 
     if (jmsContext != null) { 
      jmsContext.close(); 
     } 
     if (connection !=null) { 
      connection.close(); 
     } 
    } catch (JMSException e) { 
     LOG.warn("failed to close JMS resources: {}", e.getMessage()); 
    } 
} 

public synchronized MessageProducer getJmsProducer() { 
    if (jmsProducer == null) { 
     try { 
      connection = connectionFactory.createConnection(); 
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      jmsProducer = session.createProducer(jobQueue); 


      connection.start(); 
     } catch (JMSException e) { 
      LOG.error("failed to setup JMS message producer: {}", e.getMessage()); 
     } 
    } 
    return jmsProducer; 
} 
public void addMessageToProgressQueue(ProgressMessage progressMessage) { 
    ObjectMessage objectMessage = jmsContext.createObjectMessage(progressMessage); 
    try { 
     getJmsProducer().send(progressQueue, objectMessage); 
    } catch (JMSException e) { 
     LOG.error("failed to send progress message {}: {}", objectMessage, e.getMessage()); 
    } 
} 

}

JMSMessageListener.java

@MessageDriven(name = "JMSMessageListener", mappedName = JAVA_JMS_QUEUE, activationConfig = { 
     @ActivationConfigProperty( 
       propertyName = "acknowledgeMode", 
       propertyValue = "Auto-acknowledge"), 
     @ActivationConfigProperty( 
       propertyName = "destinationType", 
       propertyValue = "javax.jms.Queue"), 
     @ActivationConfigProperty( 
       propertyName = "destination", 
       propertyValue = JAVA_JMS_QUEUE) 


}) 
public class JMSMessageListener implements MessageListener { 

    private static Logger LOG = LoggerFactory.getLogger(JMSMessageListener.class); 

    @EJB 
    private JobsController jobsController; 

    private final ObjectMapper progressMessageMapper; 

    public JMSMessageListener() { 
     progressMessageMapper = new ObjectMapper(); 
     progressMessageMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); 
    } 

    @Override 
    public void onMessage(Message message) { 
     ProgressMessage progressMessage = null; 
     try { 
      if (message instanceof BytesMessage) { 
       BytesMessage bytesMessage = (BytesMessage) message; 
       int TEXT_LENGTH = new Long(bytesMessage.getBodyLength()).intValue(); 
       byte[] textBytes = new byte[TEXT_LENGTH]; 
       bytesMessage.readBytes(textBytes, TEXT_LENGTH); 


       String progressText = new String(textBytes, "UTF-8"); 

       progressText = progressText.replaceAll("'totalSteps': None", "'totalSteps': 0"); 
       progressMessage = progressMessageMapper.readValue(progressText, ProgressMessage.class); 
      } else if (message instanceof ObjectMessage) { 
       progressMessage = message.getBody(ProgressMessage.class); 
      } 
      if (progressMessage != null) { 

       jobsController.sendProgressMessage(progressMessage); 
      } else { 
       LOG.error("An empty progress message was received"); 
      } 
     } catch (JMSException | IOException e) { 
      LOG.error("failed to process progress message: {}", e.getMessage(), e); 
     } 
    } 
} 

回答

1

几件事情:

  • 你注入JMSContext但从来没有使用它(至少在你粘贴的代码中)。这似乎是一个错误。
  • 如果您不打算使用注入的JMSContext,而是使用注入的ConnectionFactory,那么您应该注入“java:/ JmsXA”而不是“java:/ ConnectionFactory”,因为它是&tl; pooled-connection-factory> 。为使用“java:/ ConnectionFactory”发送的每条消息创建一个连接是一种反模式,因为它没有被合并。另外,我想你想使用一个XA事务,这样的消息消费和你的MDB发送是原子,并且不会“的java:/ ConnectionFactory的”工作。
+0

嗨@Justin,谢谢您的建议。我正在尝试使用“java:/ JmsXA”而不是“java:/ ConnectionFactory”的建议,并且正在运行一些压力测试,以查看它是否仍然泄漏。我们使用JMSContext(将其添加到类JobsController中)。 – tchoesang