(1). 概述

前面剖析了与网络相关的内容,按理来说:应该要开始剖析Replicator,但却发现,如果用Mock来走测试案例剖析的话,自我感觉太虚了, 所以,换种方式来解读源码,解析一套网络流程,在剖析之前,要先把Node里所依赖的一些对象给先剖析掉,以免受影响,所以,这一篇主要剖析 HashedWheelTimer,没有想到的是:JRAFT是直接拷贝Netty的代码.

(2). HashedWheelTimer简单使用

public class HashedWheelTimerTest {

    @Test
    public void testHashedWheelTimer() throws InterruptedException {
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 12);
        hashedWheelTimer.start();

        Timeout timeout = hashedWheelTimer.newTimeout((t) -> {
            System.out.println("--------------------->" + t);
        }, 10, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(30);

        // timeout.cancel();

        hashedWheelTimer.stop();
    }
}

(3). HashedWheelTimer初始化

public HashedWheelTimer(
                 ThreadFactory threadFactory, 
				 // 每一格之间的时间间隔
				 // 1
				 long tickDuration, 
				 // TimeUnit.SECONDS
				 TimeUnit unit, 
				 // 12 
				 int ticksPerWheel,
				 long maxPendingTimeouts) {
	if (threadFactory == null) {
		throw new NullPointerException("threadFactory");
	}
	if (unit == null) {
		throw new NullPointerException("unit");
	}
	if (tickDuration <= 0) {
		throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
	}
	if (ticksPerWheel <= 0) {
		throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
	}

    // **************************************************************************
	// 1. 创建,并初始化:HashedWheelBucket[]数组
	// **************************************************************************
	// Normalize ticksPerWheel to power of two and initialize the wheel.
	wheel = createWheel(ticksPerWheel);
	// 在后面,会拿着tick和mask进行与计算,最终的值一定是在mask范围之内. 
	mask = wheel.length - 1;


	// **************************************************************************
	// 把时间转换成纳秒: 1秒 = 1000000000纳秒
	// **************************************************************************
	// Convert tickDuration to nanos.
	this.tickDuration = unit.toNanos(tickDuration);

	// Prevent overflow.
	if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
		throw new IllegalArgumentException(String.format(
			"tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
																						/ wheel.length));
	}
	
	// *******************************************************
	// 2. 通过工厂,创建Work线程
	// *******************************************************
	workerThread = threadFactory.newThread(worker);

	this.maxPendingTimeouts = maxPendingTimeouts;

	if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
		&& warnedTooManyInstances.compareAndSet(false, true)) {
		reportTooManyInstances();
	}
} // end HashedWheelTimer构造器

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
	// 对进行校验,控制在2的30幂
	if (ticksPerWheel <= 0) {
		throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
	}
	if (ticksPerWheel > 1073741824) {
		throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
	}
	
	// 
	ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
	HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
	for (int i = 0; i < wheel.length; i++) {
		wheel[i] = new HashedWheelBucket();
	}
	return wheel;
} // end createWheel

private static int normalizeTicksPerWheel(int ticksPerWheel) {
	int normalizedTicksPerWheel = 1;
	while (normalizedTicksPerWheel < ticksPerWheel) {
		normalizedTicksPerWheel <<= 1;
	}
	return normalizedTicksPerWheel;
} // end normalizeTicksPerWheel

(4). HashedWheelTimer.newTimeout

HashedWheelTimer添加任务.

// timeouts
private final Queue<HashedWheelTimeout>  timeouts = new ConcurrentLinkedQueue<>();

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
	// ... ...

   // 启动Work线程
	start();

	// ... ...
	
	// deadline 
	// 执行时间为: 当前时间(纳秒) + 延迟时间 - 启动时间
	// 所以,最终执行时间为: 逻辑时间哈.
	long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
	
	// **************************************************************************
	// 把task转换成:HashedWheelTimeout,添加到队列里(timeouts)
	// **************************************************************************
	HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
	timeouts.add(timeout);
	return timeout;
}// end newTimeout


// HashedWheelTimer属性部份
// workerStateUpdater
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater  = 
                                AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,"workerState");
// work状态								
private volatile int                                             workerState;
// work初始化
public static final int                                          WORKER_STATE_INIT      = 0;
// work启动
public static final int                                          WORKER_STATE_STARTED   = 1;
// work关闭
public static final int                                          WORKER_STATE_SHUTDOWN  = 2;
// 启动初始化latch
private final CountDownLatch                                     startTimeInitialized   = new CountDownLatch(1);

public void start() {
	switch (workerStateUpdater.get(this)) {
		case WORKER_STATE_INIT:
		    // **************************************************************************
			// 实际比较简单,通过元子性验证workerState的状态是初始化时,设置成:启动中,并且,启动work线程
			// **************************************************************************
			if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
				workerThread.start();
			}
			break;
		case WORKER_STATE_STARTED:
			break;
		case WORKER_STATE_SHUTDOWN:
			throw new IllegalStateException("cannot be started once stopped");
		default:
			throw new Error("Invalid WorkerState");
	}

	// Wait until the startTime is initialized by the worker.
	while (startTime == 0) {
		try {
			
			startTimeInitialized.await();
		} catch (InterruptedException ignore) {
		}
	}
} // end start

(5). Timeout.cancel

添加任务之后,会返回:Timeout,我们可以Hold住这个对象,进行任务的取消.


// 取消任务列表
private final Queue<HashedWheelTimeout> cancelledTimeouts = new ConcurrentLinkedQueue<>();

// 取消任务
public boolean cancel() {
	// 比较状态,
	if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
		return false;
	}
	
	// 把要取消的任务,添加到队列里.
	timer.cancelledTimeouts.add(this);
	return true;
}

(6). Work.run

private final class Worker implements Runnable {
	private final Set<Timeout> unprocessedTimeouts = new HashSet<>();

	private long               tick;

	@Override
	public void run() {
		
		// 当前的纳秒数
		// Initialize the startTime.
		startTime = System.nanoTime();
		if (startTime == 0) {
			// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
			startTime = 1;
		}

		// Notify the other threads waiting for the initialization at start().
		// 配合前面的添加任务.
		startTimeInitialized.countDown();

		do {
			// ************************************************************************
			// 1. 计算出下一轮的时间
			// ************************************************************************
			final long deadline = waitForNextTick();
			if (deadline > 0) {
				// ***************************************************************
				// 2. 这里与运算,所以,最终的idx是:0~511之间,这样,就实际对应的着数组512以内.
				// ***************************************************************
				int idx = (int) (tick & mask);
				
				// ***************************************************************
				// 3. 处理取消的任务
				// ***************************************************************
				processCancelledTasks();
				
				// ***************************************************************
				// 4. 根据idx从下标中取出来,与第2步是对应的.
				// ***************************************************************
				HashedWheelBucket bucket = wheel[idx];
				
				// ***************************************************************
				// 5. 把队列中的任务,移到桶里.
				// ***************************************************************
				transferTimeoutsToBuckets();
				bucket.expireTimeouts(deadline);
				tick++;
			}
		} while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

		// Fill the unprocessedTimeouts so we can return them from stop() method.
		for (HashedWheelBucket bucket : wheel) {
			bucket.clearTimeouts(unprocessedTimeouts);
		}
		
		
		for (;;) {
			HashedWheelTimeout timeout = timeouts.poll();
			if (timeout == null) {
				break;
			}
			if (!timeout.isCancelled()) {
				unprocessedTimeouts.add(timeout);
			}
		}
		
		processCancelledTasks();
	}// end run
}		

(7). Work.waitForNextTick

private long waitForNextTick() {
	// tick自增
	// tickDuration: 1秒 = 1000000000纳秒
	// 这一步的做法是什么呢? 实际就是下一秒哈
	long deadline = tickDuration * (tick + 1);

	for (;;) {
		final long currentTime = System.nanoTime() - startTime;
		long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

		if (sleepTimeMs <= 0) {
			if (currentTime == Long.MIN_VALUE) {
				return -Long.MAX_VALUE;
			} else {
				return currentTime;
			}
		}
		
		try {
			Thread.sleep(sleepTimeMs);
		} catch (InterruptedException ignored) {
			if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
				return Long.MIN_VALUE;
			}
		}
	}
}

(8). Work.transferTimeoutsToBuckets

private void transferTimeoutsToBuckets() {
	// 额,最多拿10W个任务处理
	// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
	// adds new timeouts in a loop.
	for (int i = 0; i < 100000; i++) {
		HashedWheelTimeout timeout = timeouts.poll();
		// 没有任务
		if (timeout == null) {
			// all processed
			break;
		}
		
		// 任务被取消掉了
		if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
			// Was cancelled in the meantime.
			continue;
		}

		
		long calculated = timeout.deadline / tickDuration;
		timeout.remainingRounds = (calculated - tick) / wheel.length;

		final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
		int stopIndex = (int) (ticks & mask);

		// ***********************************************************************************  
		// 找到对应的桶,添加timeout
		// ***********************************************************************************  
		HashedWheelBucket bucket = wheel[stopIndex];
		bucket.addTimeout(timeout);
	}
}

(9). Work.expireTimeouts

public void expireTimeouts(long deadline) {
	HashedWheelTimeout timeout = head;

	// process all timeouts
	while (timeout != null) {
		HashedWheelTimeout next = timeout.next;
		if (timeout.remainingRounds <= 0) {
			next = remove(timeout);
			if (timeout.deadline <= deadline) {
				// ******************************************************
				// 执行TimerTask.run方法
				// ******************************************************
				timeout.expire();
			} else {
				// The timeout was placed into a wrong slot. This should never happen.
				throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
					timeout.deadline, deadline));
			}
		} else if (timeout.isCancelled()) {
			next = remove(timeout);
		} else {
			timeout.remainingRounds--;
		}
		timeout = next;
	}
}

(10). 总结

HashedWheelTimer的使用还是有限的,是以HashedWheelTimer启动为基准,一轮一轮的走下去来着的,而且,添加任务时,是扔到队列里,Work线程会一直走下去,取出队列里的数据,复制到桶里,并遍历桶(链表)执行:TimerTask.