(1). 概述

本来是要剖析:Replicator的,但是,发现Replicator,它依赖很多的其余组件,所以,这一篇开始,先解决内部组件依赖的剖析,再分析Replicator,这一篇的重头戏是:BallotBox

(2). BallotBox UML图

"BallotBox UML图"

(3). BallotBoxTest

@RunWith(value = MockitoJUnitRunner.class)
public class BallotBoxTest {
    private static final String GROUP_ID = "group001";
    private BallotBox           box;
    @Mock
    private FSMCaller           waiter;
    private ClosureQueueImpl    closureQueue;

    @Before
    public void setup() {
        BallotBoxOptions opts = new BallotBoxOptions();
        this.closureQueue = new ClosureQueueImpl(GROUP_ID);
        opts.setClosureQueue(this.closureQueue);
        opts.setWaiter(this.waiter);

        // ********************************************************************
        // BallotBox初始化需要依赖:ClosureQueue和FSMCaller
		// ********************************************************************
        box = new BallotBox();
        assertTrue(box.init(opts));
    } // end setup
	
	
	// ********************************************************************
	// RAFT的要求是这样的:
	// 在复制日志时,要求过半的节点都把数据复制完成,这条日志才算是完成,才可以应用到状态机,所以,commitAt是肯定会调用状态机的.
	// ********************************************************************
	@Test
	public void testCommitAt() {
		// ********************************************************************
		// 仅第一次需要重置下:pendingIndex
		// ********************************************************************
		assertTrue(box.resetPendingIndex(1));
		
		// ********************************************************************
		// 添加一个任务
		// 注意:在这里指定了三个节点(localhost:8081,localhost:8082,localhost:8083)来着的
		// ********************************************************************
		assertTrue(this.box.appendPendingTask(
			JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"),
			JRaftUtils.getConfiguration("localhost:8081"), new Closure() {

				@Override
				public void run(Status status) {

				}
		}));
			
		// *********************************************************	
		// 这一部份,涉及到日志提交,留到下一篇进行剖析	
		// *********************************************************	
		// ... ...
	} // end testCommitAt
}	

(4). BallotBox属性

// 状态机
private FSMCaller                 waiter;

// 前面分析过,里面存储的是:Closure,可以队列尾添加Closure,并且,可以从队首弹出N个Closure.
private ClosureQueue              closureQueue;

private final StampedLock         stampedLock        = new StampedLock();

// 记录:过半节点确认复制完日志的索引
private long                      lastCommittedIndex = 0;

// *************************************************************************
// pendingIndex会随着commitAt进行递增
// resetPendingIndex/clearPendingTasks方法 会对pendingIndex进行重置
// *************************************************************************
private long                      pendingIndex;

// *************************************************************************
// SegmentList 分段存储集合来着的,以128为分界线.
// *************************************************************************
private final SegmentList<Ballot> pendingMetaQueue   = new SegmentList<>(false);

(5). BallotBox.resetPendingIndex

public boolean resetPendingIndex(final long newPendingIndex) {
	final long stamp = this.stampedLock.writeLock();
	try {
        
		// 如果:pendingIndex不为零,并且,pendingMetaQueue里有数据的情况下,是不允许进行:resetPendingIndex调用的.
		if (!(this.pendingIndex == 0 && this.pendingMetaQueue.isEmpty())) {
			LOG.error("resetPendingIndex fail, pendingIndex={}, pendingMetaQueueSize={}.", this.pendingIndex, this.pendingMetaQueue.size());
			return false;
		}
		
		// lastCommittedIndex:代表着过半节点确认的index,这时进行重置的index是不能小于:lastCommittedIndex
		if (newPendingIndex <= this.lastCommittedIndex) {
			LOG.error("resetPendingIndex fail, newPendingIndex={}, lastCommittedIndex={}.", newPendingIndex,
				this.lastCommittedIndex);
			return false;
		}
		
	    // 比较简单,赋值
		this.pendingIndex = newPendingIndex;
		this.closureQueue.resetFirstIndex(newPendingIndex);
		return true;
	} finally {
		this.stampedLock.unlockWrite(stamp);
	}
}

(6). BallotBox.appendPendingTask

public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {
	final Ballot bl = new Ballot();
	// *********************************************************************************
	// Ballot辅助类,当调用:Ballot.init方法时:
	// 会把conf的PeerId收集,并,计算出投票数(即:过半以上的节点确认)
	// 会把oldConf的PeerId收集,并,计算出投票数(即:过半以上的节点确认)
	// *********************************************************************************
	if (!bl.init(conf, oldConf)) {
		LOG.error("Fail to init ballot.");
		return false;
	}

	final long stamp = this.stampedLock.writeLock();
	try {
		// 如果:pendingIndex为0以下,则返回Pending任务失败,所以,在添加任务之前,需要让pendingIndex向前推动下.
		if (this.pendingIndex <= 0) {
			LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex);
			return false;
		}
		
		// ***********************************************************************
		// 添加分段集合里
		// ***********************************************************************
		this.pendingMetaQueue.add(bl);
		this.closureQueue.appendPendingClosure(done);
		return true;
	} finally {
		this.stampedLock.unlockWrite(stamp);
	}
}

(7). Ballot.init

Ballot是一个辅助投票的类,初始化方法会根据节点的数量计算出:投票数.

public class Ballot {	
	// *************************************************************************
	// 所有的节点
	// *************************************************************************
	private final List<UnfoundPeerId> peers    = new ArrayList<>();
	// *************************************************************************
	// 应投票的数量(要求:过半),呆会看init方法就知道了
	// *************************************************************************
	private int                       quorum;
	
	
	private final List<UnfoundPeerId> oldPeers = new ArrayList<>();
	private int                       oldQuorum;
	
	public boolean init(final Configuration conf, final Configuration oldConf) {
		// 由于是new出来的,所以,是线程安全,先清空两个集合和quorum
		this.peers.clear();
		this.oldPeers.clear();
		this.quorum = this.oldQuorum = 0;
		
		
		int index = 0;
		if (conf != null) {
			for (final PeerId peer : conf) {
				this.peers.add(new UnfoundPeerId(peer, index++, false));
			}
		}
		// *************************************************************************
		// peers = localhost:8081,localhost:8082,localhost:8083
		// peers.size() = 3
		// quorum = 3 / 2 + 1
		// 也就是说投票数量要达到:2
		// *************************************************************************
		this.quorum = this.peers.size() / 2 + 1;


		if (oldConf == null) {
			return true;
		}
		index = 0;
		for (final PeerId peer : oldConf) {
			this.oldPeers.add(new UnfoundPeerId(peer, index++, false));
		}
		this.oldQuorum = this.oldPeers.size() / 2 + 1;
		
		return true;
	}// end init
	
} // end 	Ballot

(8). Ballot大体数据结构如下

根据上面的参数,大体推测出的JSON结构体,主要是分便理解和分析.

{
   peers: [   
	   {   peerId: { endpoint: { ip: "localhost" , port: 8081 } } , found : false , index : 0  },
	   {   peerId: { endpoint: { ip: "localhost" , port: 8082 } } , found : false , index: 1  },
	   {   peerId: { endpoint: { ip: "localhost" , port: 8083 } } , found : false , index: 2  }
   ],
   quorum: 2,
   oldPeers: [
   	   {   peerId: { endpoint: { ip: "localhost" , port: 8081 } } , found : false , index : 0  }
   ],
   oldQuorum: 1
}

(9). 总结

BallotBox追加任务时,会根据节点的数量计算出投票数(quorum).