如下是一个PSS(一主两备)架构的副本集,主节点除了与两个备节点执行数据复制之外,三个节点之间还会通过心跳感知彼此的存活。
一旦主节点发生故障以后,备节点将在某个周期内检测到主节点处于不可达的状态,此后将由其中一个备节点事先发起选举并最终成为新的主节点。 这个检测周期 由electionTimeoutMillis 参数确定,默认是10s。
接下来,我们通过一些源码看看该机制是如何实现的:
<<来自 MongoDB 3.4源码>>
db/repl/replication_coordinator_impl_heartbeat.cpp
相关方法
- ReplicationCoordinatorImpl::_startHeartbeats_inlock 启动各成员的心跳
- ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget 调度任务-(计划)向成员发起心跳
- ReplicationCoordinatorImpl::_doMemberHeartbeat 执行向成员发起心跳
- ReplicationCoordinatorImpl::_handleHeartbeatResponse 处理心跳响应
- ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock 调度保活状态检查定时器
- ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock 取消并重新调度选举超时定时器
- ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1 发起主动选举
db/repl/topology_coordinator_impl.cpp
相关方法
- TopologyCoordinatorImpl::prepareHeartbeatRequestV1 构造心跳请求数据
- TopologyCoordinatorImpl::processHeartbeatResponse 处理心跳响应并构造下一步Action实例
下面这个图,描述了各个方法之间的调用关系
图-主要关系
心跳的实现
首先,在副本集组建完成之后,节点会通过ReplicationCoordinatorImpl::_startHeartbeats_inlock方法开始向其他成员发送心跳:
- void ReplicationCoordinatorImpl::_startHeartbeats_inlock() {
- const Date_t now = _replExecutor.now();
- _seedList.clear();
- //获取副本集成员
- for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
- if (i == _selfIndex) {
- continue;
- }
- //向其他成员发送心跳
- _scheduleHeartbeatToTarget(_rsConfig.getMemberAt(i).getHostAndPort(), i, now);
- }
- //仅仅是刷新本地的心跳状态数据
- _topCoord->restartHeartbeats();
- //使用V1的选举协议(3.2之后)
- if (isV1ElectionProtocol()) {
- for (auto&& slaveInfo : _slaveInfo) {
- slaveInfo.lastUpdate = _replExecutor.now();
- slaveInfo.down = false;
- }
- //调度保活状态检查定时器
- _scheduleNextLivenessUpdate_inlock();
- }
- }
在获得当前副本集的节点信息后,调用_scheduleHeartbeatToTarget方法对其他成员发送心跳,
这里_scheduleHeartbeatToTarget 的实现比较简单,其真正发起心跳是由 _doMemberHeartbeat 实现的,如下:
- void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget(const HostAndPort& target,
- int targetIndex,
- Date_t when) {
- //执行调度,在某个时间点调用_doMemberHeartbeat
- _trackHeartbeatHandle(
- _replExecutor.scheduleWorkAt(when,
- stdx::bind(&ReplicationCoordinatorImpl::_doMemberHeartbeat,
- this,
- stdx::placeholders::_1,
- target,
- targetIndex)));
- }
ReplicationCoordinatorImpl::_doMemberHeartbeat 方法的实现如下:
- void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
- const HostAndPort& target,
- int targetIndex) {
- LockGuard topoLock(_topoMutex);
- //取消callback 跟踪
- _untrackHeartbeatHandle(cbData.myHandle);
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- const Date_t now = _replExecutor.now();
- BSONObj heartbeatObj;
- Milliseconds timeout(0);
- //3.2 以后的版本
- if (isV1ElectionProtocol()) {
- const std::pair<ReplSetHeartbeatArgsV1, Milliseconds> hbRequest =
- _topCoord->prepareHeartbeatRequestV1(now, _settings.ourSetName(), target);
- //构造请求,设置一个timeout
- heartbeatObj = hbRequest.first.toBSON();
- timeout = hbRequest.second;
- } else {
- ...
- }
- //构造远程命令
- const RemoteCommandRequest request(
- target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout);
- //设置远程命令回调,指向_handleHeartbeatResponse方法
- const ReplicationExecutor::RemoteCommandCallbackFn callback =
- stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse,
- this,
- stdx::placeholders::_1,
- targetIndex);
- _trackHeartbeatHandle(_replExecutor.scheduleRemoteCommand(request, callback));
- }
(编辑:好传媒网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|