回答

6

TL; DR

ClusterListener接口可用于监控replicaset的某些方面,但如果你想更深入和/或如果你想询问事件在其外侧replicaset状态在ClusterListener提供的回调,那么你可能更愿意调用replSetGetStatus命令并检查其输出。

详细

ClusterListener提供回调,让你看/来改变你的replicaset回应。例如,下面的CLusterListener ...

public class LoggingClusterListener implements ClusterListener { 
    private static final Logger logger = LoggerFactory.getLogger(LoggingClusterListener.class); 

    @Override 
    public void clusterOpening(final ClusterOpeningEvent clusterOpeningEvent) { 
     logger.info("clusterOpening: {}", clusterOpeningEvent.getClusterId().getValue()); 
    } 

    @Override 
    public void clusterClosed(final ClusterClosedEvent clusterClosedEvent) { 
     logger.info("clusterClosed: {}", clusterClosedEvent.getClusterId().getValue()); 
    } 

    @Override 
    public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) { 
     logger.info("clusterDescriptionChanged: {}", event.getClusterId().getValue()); 
     for (ServerDescription sd : event.getNewDescription().getServerDescriptions()) { 
      logger.info("{}/{}/{}/{}", sd.getType(), sd.getCanonicalAddress(), sd.getState().name()); 
     } 
    } 
} 

...当一个MongoClient喜欢此相关的...

final MongoClientOptions options = MongoClientOptions.builder() 
    .addClusterListener(new LoggingClusterListener()) 
    .build(); 
return new MongoClient(serverAddresses, options); 

...会发出以下记录:

// cluster starting up ... 
2017-08-17 12:49:55,977 [main] clusterOpening: 599582e36d47c231ec963b0b 
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b 
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostB:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b 
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostC:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b 
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] clusterDescriptionChanged 599582e36d47c231ec963b0b 
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_OTHER/hostB:27017/CONNECTED/{}  
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_OTHER/hostC:27017/CONNECTED/{}  
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_SECONDARY/hostA:27017/CONNECTED/{}  
// ... the primary fails over to hostA:27017 
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b 
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_OTHER/hostB:27017/CONNECTED/{}  
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_SECONDARY/hostC:27017/CONNECTED/{}  
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_PRIMARY/hostA:27017/CONNECTED/{} 
2017-08-17 12:50:07,126 [main] clusterClosed: 599582e36d47c231ec963b0b 

这也许够你用的,但如果没有,例如,如果你想主动监控replicaset状态 - 而不是当出现以下情况之一只响应...

  • 集群开始
  • 集群停止
  • 集群的更改说明

...那么你可能更愿意定期采样replicaset状态并报告/日志/警报的结果。您可以通过执行replSetGetStatus命令并询问结果来完成此操作。该命令返回一个BsonDocument(其格式被描述here),其可以被询问并记录。

登录状态文件是最简单的响应,但这种方法可以通过增强的文档内容的基础上,提高警报例如,形成监控解决方案的基础

  • replicationLag>配置threadhold
  • lastHeartbeat>现在() - 配置的阈值
  • 初级的身份已经改变
  • 健康= 1

以下代码读取replicaset状态do询问它(包括计算复制滞后)并记录输出。

MongoReplicaSetStatusLogger mongoReplicaSetStatusLogger = new MongoReplicaSetStatusLogger(); 

// periodically ... 
MongoClient mongoClient = getMongoClient(); 

MongoDatabase admin = mongoClient.getDatabase("admin"); 
BsonDocument commandResult = admin.runCommand(new BsonDocument("replSetGetStatus", new BsonInt32(1)), BsonDocument.class); 
mongoReplicaSetStatusLogger.report(commandResult); 

这里的MongoReplicaSetStatusLogger实现:

import org.bson.BsonDocument; 
import org.bson.BsonInvalidOperationException; 
import org.bson.BsonNumber; 
import org.bson.BsonValue; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.Optional; 

public class MongoReplicaSetStatusLogger { 
    private static final Logger logger = LoggerFactory.getLogger(MongoReplicaSetStatusLogger.class); 

    private static final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSSZ"); 

    private static final String DEFAULT_VALUE = "UNKNOWN"; 
    private static final String MEMBERS = "members"; 

    public void report(BsonDocument replicasetStatusDocument) { 
     if (hasMembers(replicasetStatusDocument)) { 
      replicasetStatusDocument.getArray(MEMBERS).stream() 
        .filter(BsonValue::isDocument) 
        .map(memberDocument -> (BsonDocument) memberDocument) 
        .forEach(memberDocument -> logMemberDocument(memberDocument)); 
     } else { 
      logger.warn("The replicaset status document does not contain a '{}' attributes, perhaps there has been " + 
        "a MongoDB upgrade and the format has changed!", MEMBERS); 
     } 
    } 

    private boolean hasMembers(BsonDocument replicasetStatusDocument) { 
     return replicasetStatusDocument.containsKey(MEMBERS) && replicasetStatusDocument.get(MEMBERS).isArray(); 
    } 

    private void logMemberDocument(BsonDocument memberDocument) { 
     StringBuilder stringBuilder = new StringBuilder() 
       .append(logAttribute("node", getStringValue(memberDocument, "name"))) 
       .append(logAttribute("health", getNumericValue(memberDocument, "health"))) 
       .append(logAttribute("state", getStringValue(memberDocument, "stateStr"))) 
       .append(logAttribute("uptime(s)", getNumericValue(memberDocument, "uptime"))) 
       .append(logAttribute("lastOptime", getDateTimeValue(memberDocument, "optimeDate"))) 
       .append(logAttribute("lastHeartbeat", getDateTimeValue(memberDocument, "lastHeartbeat"))) 
       .append(logAttribute("lastHeartbeatRecv", getDateTimeValue(memberDocument, "lastHeartbeatRecv"))) 
       .append(logAttribute("ping(ms)", getNumericValue(memberDocument, "pingMs"))) 
       .append(logAttribute("replicationLag(s)", getReplicationLag(memberDocument))); 

     logger.error(stringBuilder.toString()); 
    } 

    private String logAttribute(String key, Optional<String> value) { 
     return new StringBuilder(key).append("=").append(value.orElse(DEFAULT_VALUE)).append("|").toString(); 
    } 

    private Optional<String> getStringValue(BsonDocument memberDocument, String key) { 
     if (memberDocument.containsKey(key)) { 
      try { 
       return Optional.of(memberDocument.getString(key).getValue().toUpperCase()); 
      } catch (BsonInvalidOperationException e) { 
       logger.warn("Exception reading: {} from replicaset status document, message: {}.", key, e.getMessage()); 
      } 
     } 
     return Optional.empty(); 
    } 

    private Optional<String> getNumericValue(BsonDocument memberDocument, String key) { 
     if (memberDocument.containsKey(key)) { 
      BsonNumber bsonNumber = memberDocument.getNumber(key); 
      if (bsonNumber.isInt32()) { 
       return Optional.of(Integer.toString(bsonNumber.intValue())); 
      } else if (bsonNumber.isInt64()) { 
       return Optional.of(Long.toString(bsonNumber.longValue())); 
      } else if (bsonNumber.isDouble()) { 
       return Optional.of(Double.toString(bsonNumber.doubleValue())); 
      } 
     } 
     return Optional.empty(); 
    } 

    private Optional<String> getDateTimeValue(BsonDocument memberDocument, String key) { 
     if (memberDocument.containsKey(key)) { 
      try { 
       return Optional.of(dateFormatter.format(new Date(memberDocument.getDateTime(key).getValue()))); 
      } catch (BsonInvalidOperationException e) { 
       logger.warn("Exception reading: {} from replicaset status document due to: {}!", key, e.getMessage()); 
      } 
     } 
     return Optional.empty(); 
    } 

    private Optional<String> getReplicationLag(BsonDocument memberDocument) { 
     if (memberDocument.containsKey("optimeDate") && memberDocument.containsKey("lastHeartbeat")) { 
      try { 
       long optimeDate = memberDocument.getDateTime("optimeDate").getValue(); 
       long lastHeartbeat = memberDocument.getDateTime("lastHeartbeat").getValue(); 
       long replicationLag = lastHeartbeat - optimeDate; 
       return Optional.of(Long.toString(replicationLag)); 
      } catch (BsonInvalidOperationException e) { 
       logger.warn("Exception reading 'optimeDate' or 'lastHeartbeat' from replicaset status document due to: {}!", e.getMessage()); 
      } catch (IllegalArgumentException e) { 
       logger.warn("Exception calculating the replication lag due to: {}!", e.getMessage()); 
      } 
     } 
     return Optional.empty(); 
    } 
} 

下面是输出的一个例子:

2017-08-17 15:44:35,192|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostA:27017|health=1.0|state=PRIMARY|uptime(s)=21|lastOptime=2017-08-17T15:43:32,000+0100|lastHeartbeat=UNKNOWN|lastHeartbeatRecv=UNKNOWN|ping(ms)=UNKNOWN|replicationLag(s)=UNKNOWN| 
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostB:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,443+0100|lastHeartbeatRecv=2017-08-17T15:43:36,412+0100|ping(ms)=0|replicationLag(s)=15443| 
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostC:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,444+0100|lastHeartbeatRecv=2017-08-17T15:43:36,470+0100|ping(ms)=0|replicationLag(s)=15444| 
+0

谢谢你的见解。 :) – nullpointer