2016-07-15 110 views
1

我正在使用jmx来监视kafka主题。使用jmx显示器kafka主题

val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi"); 

val jmxc = JMXConnectorFactory.connect(url, null); 
val mbsc = jmxc.getMBeanServerConnection(); 
val messageCountObj = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=mytopic"); 
val messagesInPerSec = mbsc.getAttribute(messageCountObj,"MeanRate") 

使用此代码我可以在broker1上获得“mytopic”的MeanRate。 但我有10个经纪商,我怎么能从所有经纪商那里获得“mytopic”的MeanRate?

我有尝试 “服务:JMX:RMI:/// JNDI/RMI:// BROKER1:9393,broker2:9393,broker3上:9393/jmxrmi”

得到一个错误:(

回答

0

如果是这么简单,这将是很好的;)

如你所述,没有办法做到这一点。您需要为每个经纪人分别建立连接。

一个可行的办法是使用MBeanServer Federation这将在一个MBeanServer中注册为每个经纪人的代理,因此,如果你这样做对BROKER1,你可以连接到service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi和查询统计为您的所有经纪人在一个去,但你需要查询10个不同的ObjectName,查询每个ObjectNames的值,然后自己计算MeanRate。 [Java]伪代码:

ObjectName wildcard = new ObjectName("*:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=mytopic"); 
double totalRate = 0d; 
int respondingBrokers = 0; 
for(ObjectName on : mbsc.queryNames(wildcard, null)) { 
    totalRate += (Double)mbsc.getAttribute(messageCountObj,"MeanRate"); 
    respondingBrokers++; 
} 
// Average rate of mean rates: totalRate/respondingBrokers 

注意:没有异常处理,并且我假定rate类型是Double。

您也可以创建并注册一个计算联合代理上的聚合平均值的自定义MBean。

如果你是面向maven的,你可以从here构建OpenDMK。