(1). 概述

前面,剖析了,预投票过程,预投票的过程是为了防止无效投票的,只有预投票之后通过之后,才能进入投票阶段,所以,这一篇主要剖析,发起投票请求的剖析.

(2). 入口在哪(NodeImpl.handlePreVoteResponse)

if (response.getGranted()) { // 参与投票的节点,投了一票给本节点
	this.prevVoteCtx.grant(peerId);
	if (this.prevVoteCtx.isGranted()) {
		doUnlock = false;
		// *******************************************************************************
		// 进行投票,判断投票是否通过.
		// *******************************************************************************
		electSelf();
	}
}

(3). NodeImpl.electSelf

private void electSelf() {
	long oldTerm;
	try {
		LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
		if (!this.conf.contains(this.serverId)) { // 验证节点是否在配置文件中存在.
			LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
			return;
		}

		if (this.state == State.STATE_FOLLOWER) {
			LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
			// 在进入选举时,如果状态是:FOLLOWER状态,则先停止:预选举. 
			this.electionTimer.stop();
		}

		// 先忽略这部份内容
		// 重置Leader节点
		resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "A follower's leader_id is reset to NULL as it begins to request_vote."));
		
		// 在投票时,先让节点状态成为:CANDIDATE
		this.state = State.STATE_CANDIDATE;
		this.currTerm++;
		this.votedId = this.serverId.copy();
		LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);

		// ********************************************************************
		// 启动投票线程
		// ********************************************************************
		this.voteTimer.start();
		// 初始化投票类和法定票数
		this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
		oldTerm = this.currTerm;
	} finally {
		this.writeLock.unlock();
	}

	final LogId lastLogId = this.logManager.getLastLogId(true);

	this.writeLock.lock();
	try {
		// vote need defense ABA after unlock&writeLock
		if (oldTerm != this.currTerm) {
			LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
			return;
		}

		// 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
		for (final PeerId peer : this.conf.listPeers()) {
			if (peer.equals(this.serverId)) { // 跳过本节点(127.0.0.1:8081)
				continue;
			}

			// 一个节点一个节点的遍历进行连接,连接不成功,直接跳过,进入下一次循环
			if (!this.rpcService.connect(peer.getEndpoint())) {
				LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
				continue;
			}

			// 构建投票请求
			final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
			done.request = RequestVoteRequest.newBuilder() //
				.setPreVote(false) // It's not a pre-vote request.
				.setGroupId(this.groupId) //
				.setServerId(this.serverId.toString()) //
				.setPeerId(peer.toString()) //
				.setTerm(this.currTerm) //
				.setLastLogIndex(lastLogId.getIndex()) //
				.setLastLogTerm(lastLogId.getTerm()) //
				.build();
			// *******************************************************************************
			// 向其余节点,发起投票请求
			// *******************************************************************************
			this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
		}
		
		
		this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
		this.voteCtx.grant(this.serverId);
		
		if (this.voteCtx.isGranted()) {
			// ****************************************************************
			// 晋升为Leader
			// ****************************************************************
			becomeLeader();
		}
	} finally {
		this.writeLock.unlock();
	} // end finally
}

(4). voteTimer初始化

// 这个是超时投票线程,会对状态进行降级处理.
this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getVoteTimer(this.options.isSharedVoteTimer(), name)) {

	@Override
	protected void onTrigger() {
		handleVoteTimeout();
	}

	@Override
	protected int adjustTimeout(final int timeoutMs) {
		return randomTimeout(timeoutMs);
	}
};

(5). NodeImpl.handleVoteTimeout

private void handleVoteTimeout() {
	this.writeLock.lock();
	// 超时要求状态必须为:CANDIDATE
	if (this.state != State.STATE_CANDIDATE) {
		this.writeLock.unlock();
		return;
	}

	// 判断是否超时(估计在发心跳时,不停的更新这个状态)
	if (this.raftOptions.isStepDownWhenVoteTimedout()) {
		LOG.warn( "Candidate node {} term {} steps down when election reaching vote timeout: fail to get quorum vote-granted.",this.nodeId, this.currTerm);
		// **************************************************************************
		// 1. 设置节点关闭
		// **************************************************************************
		stepDown(this.currTerm, false, new Status(RaftError.ETIMEDOUT, "Vote timeout: fail to get quorum vote-granted."));
		// unlock in preVote
		
		// **************************************************************************
		// 2. 重新进入预投票
		// **************************************************************************
		preVote();
	} else {
		// **************************************************************************
		// 1. 安全选举
		// **************************************************************************
		LOG.debug("Node {} term {} retry to vote self.", getNodeId(), this.currTerm);
		// unlock in electSelf
		electSelf();
	}
}

(6). 晋升为Leader(Node.becomeLeader)

private void becomeLeader() {
	Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
	LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
		this.conf.getConf(), this.conf.getOldConf());
	
	// 取消投票请求
	// cancel candidate vote timer
	stopVoteTimer();
	
	// 当前状态为Leader
	this.state = State.STATE_LEADER;
	this.leaderId = this.serverId.copy();
	
	// ReplicatorGroup重置term
	this.replicatorGroup.resetTerm(this.currTerm);
	
	
	// Start follower's replicators
	for (final PeerId peer : this.conf.listPeers()) {
		if (peer.equals(this.serverId)) { // 排除本机节点
			continue;
		}
		
		// ReplicatorGroup中添加节点
		LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
		if (!this.replicatorGroup.addReplicator(peer)) {
			LOG.error("Fail to add a replicator, peer={}.", peer);
		}
	}

	// Start learner's replicators
	for (final PeerId peer : this.conf.listLearners()) {
		LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
		if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {
			LOG.error("Fail to add a learner replicator, peer={}.", peer);
		}
	}

	// init commit manager
	this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
	
	// Register _conf_ctx to reject configuration changing before the first log is committed.
	if (this.confCtx.isBusy()) {
		throw new IllegalStateException();
	}
	
	this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
	this.stepDownTimer.start();
}

(6). 总结

在正式发起投票前,要求有过半的节点预投票之后,才能进入投票过程,投票和预投票没有啥太大的区别,只有达到法定票数后,本节点才会晋升为Leader.