(1). 概述

本来应该要剖析StateMachine的,但是,在剖析FSMCaller(onCommitted方法)时,有涉及到一个类:IteratorImpl,所以,在这一篇主要剖析它了.

(2). IteratorImplTest

@RunWith(value = MockitoJUnitRunner.class)
public class IteratorImplTest {

    private static final String GROUP_ID = "group001";
    private IteratorImpl        iter;
    @Mock
    private NodeImpl            node;
    @Mock
    private FSMCallerImpl       fsmCaller;
    @Mock
    private LogManager          logManager;
    private List<Closure>       closures;
    private AtomicLong          applyingIndex;

    @Before
    public void setup() {
        Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID);
        Mockito.when(this.fsmCaller.getNode()).thenReturn(node);
        this.applyingIndex = new AtomicLong(0);

        this.closures = new ArrayList<>();
        for (int i = 0; i < 11; i++) {
            this.closures.add(new MockClosure());
            final LogEntry log = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
            log.getId().setIndex(i);
            log.getId().setTerm(1);
			// *****************************************************************
			// 创建12条日志,当调用:logManager.getEntry(i)时,返回:LogEntry
			// *****************************************************************
            Mockito.when(this.logManager.getEntry(i)).thenReturn(log);
        }
        
		// 10L 代表着最后commitIndex
        this.iter = new IteratorImpl(fsmCaller, logManager, closures, 0L, 0L, 10L, applyingIndex);
    } // end setup

    @Test
    public void testNext() {
        int i = 1;
        while (iter.isGood()) {
            assertEquals(i, iter.getIndex());
            assertNotNull(iter.done());
            final LogEntry log = iter.entry();
            assertEquals(i, log.getId().getIndex());
            assertEquals(1, log.getId().getTerm());
            iter.next();
            i++;
        }
        assertEquals(i, 11);
    } // end testNext
} // end 	IteratorImplTest

(3). IteratorImpl构造器

// start  IteratorImpl 属性
private long                currentIndex;
private final long          committedIndex;
private LogEntry            currEntry = new LogEntry(); // blank entry

// start  IteratorImpl 构造器
public IteratorImpl(final FSMCallerImpl fsmCaller, final LogManager logManager, final List<Closure> closures,
                        final long firstClosureIndex, final long lastAppliedIndex, final long committedIndex,
                        final AtomicLong applyingIndex) {
	super();
	this.fsmCaller = fsmCaller;
	this.fsmCommittedIndex = -1L;
	this.logManager = logManager;
	this.closures = closures;
	// firstClosureIndex = 0
	this.firstClosureIndex = firstClosureIndex;
	// **********************************************************
	// 这个:currentIndex是一个游标来着的
	// 每调用:next()方法一次,currentIndex进行自增,自增之后,是不可以大于:committedIndex
	// currentIndex = 0
	// **********************************************************
	this.currentIndex = lastAppliedIndex;

	// **********************************************************
	// 这个:committedIndex代表着最后提交的index,也就是最大可读取的位置
	// committedIndex = 0
	// **********************************************************
	this.committedIndex = committedIndex;
	// applyingIndex = 0
	this.applyingIndex = applyingIndex;

	// **********************************************************
	// 获取下一个Entry数据
	// **********************************************************
	next();
}

(4). IteratorImpl.next

也就是说:IteratorImpl的初始化的时候,会从LogManager加载一条Entry,加载条件是下一条(lastAppliedIndex + 1).

public void next() {
	// 释放当前的Entry
	this.currEntry = null; //release current entry
	
	// currentIndex = 0
	// committedIndex = 10 
	//get next entry
	if (this.currentIndex <= this.committedIndex) {
		// currentIndex = 1 
		++this.currentIndex;
		
		// currentIndex = 1
		// committedIndex = 10 
		if (this.currentIndex <= this.committedIndex) {
			try {
				// 从logManager中获取:Entry
				this.currEntry = this.logManager.getEntry(this.currentIndex);
				if (this.currEntry == null) {
					// ... ... 
				}
			} catch (final LogEntryCorruptedException e) {
				// ... ... 
			}
			// 修改:applyingIndex为:currentIndex(1)
			this.applyingIndex.set(this.currentIndex);
		}
	}
}

(5). IteratorImpl.isGood

isGood用来验证:currentIndex要小于committedIndex,只有这样,才能继续调用:next获取数据

public boolean isGood() {
	// currentIndex = 1
	// committedIndex = 10 
	return this.currentIndex <= this.committedIndex && !hasError();
}

(6). IteratorImpl.entry

public LogEntry entry() {
	return this.currEntry;
}

(7). 总结

IteratorImpl的目的是:允许加载:currentIndex~committedIndex之间的Entry,每调用一次next方法,游标(currentIndex)进行移动,然后,通过LogManager加载游标(currentIndex)处的数据(Entry),再通过currEntry进行Hold住这个Entry.