2016-11-10 103 views
6

我们有一个ActiveMQ代理,它使用JMS,AMQP和MQTT连接到来自不同客户端的代理。出于某种原因,我们还没有弄清楚一组特定的MQTT客户端经常(并非总是)持久订阅。这是一个测试环境,客户端经常被添加和删除,后者有时通过拔插或重新启动嵌入式设备,以便他们不能正确取消订阅。效果(IIUC)是,代理为可能永远不会再看到的设备(我可以在http://my_broker:8161/admin/subscribers.jsp下看到这些设备)为其永久保留关于这些主题的消息,直到它最终在它自己的内存占用下崩溃。如何让我的ActiveMQ代理放置离线持久订阅者

这里的问题在于订阅者持久订阅,我们需要找出原因。然而,也有人认为客户这样做(不知情)不应该让经纪人陷入停顿,所以我们需要独立解决这个问题。

我发现there are settings for a timeout for offline durable subscriptions,把那些进入我们的代理配置(最后两行):

<broker 
    xmlns="http://activemq.apache.org/schema/core" 
    brokerName="my_broker" 
    dataDirectory="${activemq.data}" 
    useJmx="true" 
    advisorySupport="false" 
    persistent="false" 
    offlineDurableSubscriberTimeout="1800000" 
    offlineDurableSubscriberTaskSchedule="60000"> 

如果我理解正确的话,上面应该检查每一分钟和罢免从未出现过了半个多客户小时。然而,与文档相反,这看起来并不起作用:几天前,我订购了一个消费者然后拔掉插件的消费者仍然可以在脱机持久订阅者列表中看到,代理的内存占用量不断增加,如果我在代理的Web界面中手动删除订户,我可以看到内存占用降低。

因此,这里是我的问题:

  1. 什么决定MQTT订阅到主题上的ActiveMQ代理是否耐用?
  2. 我在设置ActiveMQ设置中脱机持久订阅的超时设置时做了什么错误?
+0

你尝试其他方式,用很短的一段TTL(生存时间)发布消息,并通过配置短** ** expireMessagesPeriod?根据文档,使用这种配置,系统必须在TTL周期结束后清除所有这些消息,这对于长时间丢失的持久订阅者(谁没有取消订阅)来说并不重要。这也应该有助于我们释放内存资源,因为消耗的实际内存是存储“消息”,而不是用于存储订户对象本身。 – blackpen

回答

2

我提取了删除超时的持久订阅的相关代码(doCleanup())。

在成功的情况下,执行:

LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 

在故障情况下,执行:

LOG.error("Failed to remove inactive durable subscriber", e); 

查找上面的日志行中的日志文件,并与您在使用观察到的细节匹配它admin/subscribers.jsp查看器。如果它不打印任何行,订阅可能因为某种原因而保留active,或者您可能偶然发现了一个错误。

另外,如果可以,您是否可以尝试删除经纪人名称中的下划线(_)?该手册讨论了代理商名称中的下划线问题。

代码:

public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 
    super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 
    if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { 
     this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); 
     this.cleanupTask = new TimerTask() { 
     @Override 
     public void run() { 
      doCleanup(); 
     } 
     }; 
     this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); 
    } 
} 

public void doCleanup() { 
    long now = System.currentTimeMillis(); 
    for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { 
     DurableTopicSubscription sub = entry.getValue(); 
     if (!sub.isActive()) { 
     long offline = sub.getOfflineTimestamp(); 
     if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { 
      LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 
      try { 
       RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 
       info.setClientId(entry.getKey().getClientId()); 
       info.setSubscriptionName(entry.getKey().getSubscriptionName()); 
       ConnectionContext context = new ConnectionContext(); 
       context.setBroker(broker); 
       context.setClientId(entry.getKey().getClientId()); 
       removeSubscription(context, info); 
      } catch (Exception e) { 
       LOG.error("Failed to remove inactive durable subscriber", e); 
      } 
     } 
     } 
    } 
} 

// The toString method for DurableTopicSubscription class 
@Override 
public synchronized String toString() { 
    return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 
} 
+0

感谢您回答这个问题。我们的专业支持选项出现了两个事实:_1)_在ActiveMQ中确实存在一个错误,它会导致脱机持久订户失效。 _2)_为了从MQTT非持久地订阅,你需要连接'cleanSession'设置为true,并且QoS <1。 – sbi