(1). 概述

一直以为Node的角色是网络通信的Server,但是,稍微的剖析了一下之后,发现好像,它在网络通信层属于Client端,这一篇主要是看下Node的初始化方法.

(2). Node UML图解

"Node UML图解"

(3). Scheduler初始化

// RaftTimerFactory(工厂模式)
this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(), this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool");

(4). 定时任务初始化

// Init timers
final String suffix = getNodeId().toString();
String name = "JRaft-VoteTimer-" + suffix;
this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getVoteTimer(
	this.options.isSharedVoteTimer(), name)) {

	@Override
	protected void onTrigger() {
		handleVoteTimeout();
	}

	@Override
	protected int adjustTimeout(final int timeoutMs) {
		return randomTimeout(timeoutMs);
	}
};

// 选举
name = "JRaft-ElectionTimer-" + suffix;
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
	TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {

	@Override
	protected void onTrigger() {
		handleElectionTimeout();
	}

	@Override
	protected int adjustTimeout(final int timeoutMs) {
		return randomTimeout(timeoutMs);
	}
};

// stop
name = "JRaft-StepDownTimer-" + suffix;
this.stepDownTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs() >> 1,
	TIMER_FACTORY.getStepDownTimer(this.options.isSharedStepDownTimer(), name)) {

	@Override
	protected void onTrigger() {
		handleStepDownTimeout();
	}
};

// Snapshot
name = "JRaft-SnapshotTimer-" + suffix;
this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000,
	TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) {

	private volatile boolean firstSchedule = true;

	@Override
	protected void onTrigger() {
		handleSnapshotTimeout();
	}

	@Override
	protected int adjustTimeout(final int timeoutMs) {
		if (!this.firstSchedule) {
			return timeoutMs;
		}

		// Randomize the first snapshot trigger timeout
		this.firstSchedule = false;
		if (timeoutMs > 0) {
			int half = timeoutMs / 2;
			return half + ThreadLocalRandom.current().nextInt(half);
		} else {
			return timeoutMs;
		}
	}
};

(5). Disruptor初始化

// Disruptor
this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure> newInstance() //
	.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
	.setEventFactory(new LogEntryAndClosureFactory()) //
	.setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) //
	.setProducerType(ProducerType.MULTI) //
	.setWaitStrategy(new BlockingWaitStrategy()) //
	.build();
this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.applyQueue = this.applyDisruptor.start();
if (this.metrics.getMetricRegistry() != null) {
	this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",
		new DisruptorMetricSet(this.applyQueue));
}

(6). FSMCaller初始化

// FSM有限状态机的构建
this.fsmCaller = new FSMCallerImpl();

(7). LogManager初始化

private boolean initLogStorage() {
	Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
	this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
	this.logManager = new LogManagerImpl();
	final LogManagerOptions opts = new LogManagerOptions();
	opts.setGroupId(this.groupId);
	opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
	opts.setLogStorage(this.logStorage);
	opts.setConfigurationManager(this.configManager);
	opts.setFsmCaller(this.fsmCaller);
	opts.setNodeMetrics(this.metrics);
	opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
	opts.setRaftOptions(this.raftOptions);
	return this.logManager.init(opts);
}

(8). RaftMetaStorage初始化

private boolean initMetaStorage() {
	this.metaStorage = this.serviceFactory.createRaftMetaStorage(this.options.getRaftMetaUri(), this.raftOptions);
	RaftMetaStorageOptions opts = new RaftMetaStorageOptions();
	opts.setNode(this);
	if (!this.metaStorage.init(opts)) {
		LOG.error("Node {} init meta storage failed, uri={}.", this.serverId, this.options.getRaftMetaUri());
		return false;
	}
	this.currTerm = this.metaStorage.getTerm();
	this.votedId = this.metaStorage.getVotedFor().copy();
	return true;
}

(9). FSMCaller初始化

private boolean initFSMCaller(final LogId bootstrapId) {
	if (this.fsmCaller == null) {
		LOG.error("Fail to init fsm caller, null instance, bootstrapId={}.", bootstrapId);
		return false;
	}
	this.closureQueue = new ClosureQueueImpl(this.groupId);
	final FSMCallerOptions opts = new FSMCallerOptions();
	opts.setAfterShutdown(status -> afterShutdown());
	opts.setLogManager(this.logManager);
	opts.setFsm(this.options.getFsm());
	opts.setClosureQueue(this.closureQueue);
	opts.setNode(this);
	opts.setBootstrapId(bootstrapId);
	opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
	return this.fsmCaller.init(opts);
}

(10). BallotBox初始化

// 投票器构建
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
// 配置有限状态机
ballotBoxOpts.setWaiter(this.fsmCaller);
// 配置队列
ballotBoxOpts.setClosureQueue(this.closureQueue);
// 初始化日志投票器
if (!this.ballotBox.init(ballotBoxOpts)) {
	LOG.error("Node {} init ballotBox failed.", getNodeId());
	return false;
}

(11). SnapshotExecutor初始化

private boolean initSnapshotStorage() {
	if (StringUtils.isEmpty(this.options.getSnapshotUri())) {
		LOG.warn("Do not set snapshot uri, ignore initSnapshotStorage.");
		return true;
	}
	this.snapshotExecutor = new SnapshotExecutorImpl();
	final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
	opts.setUri(this.options.getSnapshotUri());
	opts.setFsmCaller(this.fsmCaller);
	opts.setNode(this);
	opts.setLogManager(this.logManager);
	opts.setAddr(this.serverId != null ? this.serverId.getEndpoint() : null);
	opts.setInitTerm(this.currTerm);
	opts.setFilterBeforeCopyRemote(this.options.isFilterBeforeCopyRemote());
	// get snapshot throttle
	opts.setSnapshotThrottle(this.options.getSnapshotThrottle());
	return this.snapshotExecutor.init(opts);
}

(12). ReplicatorGroup和RaftClientService初始化

this.replicatorGroup = new ReplicatorGroupImpl();
// ************************************************************************************
// 注意:这里初始化的实际是网络通端的Client来着的. 
// ************************************************************************************
this.rpcService = new DefaultRaftClientService(this.replicatorGroup, this.options.getAppendEntriesExecutors());
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
rgOpts.setLogManager(this.logManager);
rgOpts.setBallotBox(this.ballotBox);
rgOpts.setNode(this);
rgOpts.setRaftRpcClientService(this.rpcService);
rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
rgOpts.setRaftOptions(this.raftOptions);
rgOpts.setTimerManager(this.timerManager);

this.options.setMetricRegistry(this.metrics.getMetricRegistry());

if (!this.rpcService.init(this.options)) {
	LOG.error("Fail to init rpc service.");
	return false;
}
this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);

(13). ReadOnlyService初始化

this.readOnlyService = new ReadOnlyServiceImpl();
final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
rosOpts.setFsmCaller(this.fsmCaller);
rosOpts.setNode(this);
rosOpts.setRaftOptions(this.raftOptions);

if (!this.readOnlyService.init(rosOpts)) {
	LOG.error("Fail to init readOnlyService.");
	return false;
}

(14). 快照线程启动

if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
	LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
	this.snapshotTimer.start();
}

(16). 开启选举定时任务

// **********************************************************************************
// 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
// 其余节点不为空的情况下,进行配置
// **********************************************************************************
if (!this.conf.isEmpty()) { 
	stepDown(this.currTerm, false, new Status());
} // end if



private void stepDown(final long term, final boolean wakeupCandidate, final Status status) {   
  // ... ...
  if (!isLearner()) {
	  // ********************************************************************************
	  // 启用选举
	  // ********************************************************************************
	  this.electionTimer.restart();
  } // end if  
}

(15). 总结

Node的初始化还是比较复杂的,但是,Node所依赖的对象,我们全都源码剖析过了的,所以,应该不难理解.