我提取了删除超时的持久订阅的相关代码(doCleanup()
)。
在成功的情况下,执行:
LOG.info("Destroying durable subscriber due to inactivity: {}", sub);
在故障情况下,执行:
LOG.error("Failed to remove inactive durable subscriber", e);
查找上面的日志行中的日志文件,并与您在使用观察到的细节匹配它admin/subscribers.jsp查看器。如果它不打印任何行,订阅可能因为某种原因而保留active
,或者您可能偶然发现了一个错误。
另外,如果可以,您是否可以尝试删除经纪人名称中的下划线(_)?该手册讨论了代理商名称中的下划线问题。
代码:
public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
this.cleanupTask = new TimerTask() {
@Override
public void run() {
doCleanup();
}
};
this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
}
}
public void doCleanup() {
long now = System.currentTimeMillis();
for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
DurableTopicSubscription sub = entry.getValue();
if (!sub.isActive()) {
long offline = sub.getOfflineTimestamp();
if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
LOG.info("Destroying durable subscriber due to inactivity: {}", sub);
try {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(entry.getKey().getClientId());
info.setSubscriptionName(entry.getKey().getSubscriptionName());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(entry.getKey().getClientId());
removeSubscription(context, info);
} catch (Exception e) {
LOG.error("Failed to remove inactive durable subscriber", e);
}
}
}
}
}
// The toString method for DurableTopicSubscription class
@Override
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
}
你尝试其他方式,用很短的一段TTL(生存时间)发布消息,并通过配置短** ** expireMessagesPeriod?根据文档,使用这种配置,系统必须在TTL周期结束后清除所有这些消息,这对于长时间丢失的持久订阅者(谁没有取消订阅)来说并不重要。这也应该有助于我们释放内存资源,因为消耗的实际内存是存储“消息”,而不是用于存储订户对象本身。 – blackpen