2016-01-22 69 views
0

我正在使用嵌入式ActiveMQ代理。 我的目标是找到一种方法来检测队列上的外部生产者何时失去连接。如何在生产者连接中断时得到通知?

我开始经纪人是这样的:

BrokerService broker = new BrokerService(); 
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port); 
setLastMessagesPersistent(broker); 
broker.start(); 

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); 
connection = factory.createConnection(); 
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

connection.start(); 

此后,我尝试添加TransportListener:

((ActiveMQConnection) connection).addTransportListener(new TransportListener() { 
    public void transportResumed() { 
     System.out.println("resumed"); 
    } 
    public void transportInterupted() { 
     System.out.println("interrupted"); 
    } 
    public void onException(IOException arg0) { 
     System.out.println("ioexception: " + arg0); 
    } 
    public void onCommand(Object arg0) { 
     System.out.println("command: " + arg0); 
    } 
}); 

我也注册一个消费者和ProducerListener这样的:

Destination dest = session.createQueue(queuename); 
MessageConsumer consumer = session.createConsumer(dest); 

ProducerEventSource source = new ProducerEventSource(connection, dest); 
System.out.println("Setting Producer Listener"); 
source.setProducerListener(prodevent -> { 
    System.out.println("producer status: " + prodevent.isStarted()); 
}); 
// Gets called from inside the broker's Thread and somehow causes deadlocks if I don't invoke this from the outside 
new Thread(() -> { 
    try { 
     consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, gameEventManager, playerID)); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
}).start(); 

不幸的是,TransportListener和ProducerListener都不能给我一个y当我强制退出先前作为制作人添加的另一个应用程序时(Alt + F4),输出y。该经纪人肯定会注意到:

WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset 
WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset 

但我没有找到一种方法来获取这些Java事件的回调。 我也尝试在代理中设置自定义IOExceptionHandler,并在连接中添加ExceptionListener。他们也从未被召唤过。

回答

1

您可以使用咨询主题ActiveMQ.Advisory.Connection或甚至ActiveMQ.Advisory.Producer.Queue ActiveMQ.Advisory.Producer.Topic,它们提供了关于生产者数量,检查此链接http://activemq.apache.org/advisory-message.html

+0

谢谢,这解决了我。听取具有“RemoveInfo”类型数据结构的咨询事件正是我所需要的。 – Felk

0

一个可能的方法是分析日志输出来进行连接重置

WARN |传输连接到:tcp://127.0.0.1:58988失败: java.net.SocketException:连接重置 警告|交通运输 连接到:TCP://127.0.0.1:58986失败:java.net.SocketException异常: 连接重置

由于插座中的ActiveMQ实现你必须添加的ExceptionListener有写自己异常处理例程...

相关问题