(1). 概述

在前面,剖析了,当JedisConnectionFactory初始化时,底层会用Map<slot,JedisPool>来保存slot与JedisPool映射的关系,而,我们在使用Jedis执行set命令时,底层是如何处理的?

(2). Redis set操作

// 在深入源码之前,先要聊一件事:在Spring中我们有使用过JdbcTemplate,在这个类内部是要求配置:DataSource的.
// StringRedisTemplate在初始化时,实际也是需要一个:RedisConnectionFactory的.
// StringRedisTemplate在哪初始化的呢?  
// 答案就是:RedisAutoConfiguration.stringRedisTemplate

// 1. 获得StringRedisTemplate
StringRedisTemplate stringRedisTemplate = ctx.getBean(StringRedisTemplate.class);
// 2.  new BoundValueOperations
BoundValueOperations<String, String> ops = stringRedisTemplate.boundValueOps("c");
// 3. set操作
ops.set("cccc");

(3). StringRedisTemplate

public BoundValueOperations<K, V> boundValueOps(K key) {
	// 每次调用,都new出来一个
	return new DefaultBoundValueOperations<>(key, this);
}// end BoundValueOperations

// ************************************************************
// StringRedisTemplate.boundValueOps 返回了一个:DefaultBoundValueOperations
// ************************************************************
DefaultBoundValueOperations(K key, RedisOperations<K, V> operations) {
	super(key, operations);
	// 回调:RedisTemplate.opsForValue
	this.ops = operations.opsForValue();
} // end DefaultBoundValueOperations

// RedisTemplate.opsForValue
public ValueOperations<K, V> opsForValue() {
	if (valueOps == null) {
		// *********************************************************
		// new DefaultValueOperations
		// *********************************************************
		valueOps = new DefaultValueOperations<>(this);
	}
	return valueOps;
} // end RedisTemplate.opsForValue

(4). DefaultValueOperations

public void set(K key, V value) {
	// 1. 对value进行序列化
	byte[] rawValue = rawValue(value);
	
	// 3. 调用:execute
	execute(
	    // 2. 创建了一个Callback
	    new ValueDeserializingRedisCallback(key) {
			@Override
			protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
				connection.set(rawKey, rawValue);
				return null;
			}
	   }, 
	   true);
} // end set


// 4. 委托给:StringRedisTemplate去执行
<T> T execute(RedisCallback<T> callback, boolean b) {
	// **************************************************************
	// 调用StringRedisTemplate.execute方法,并传递一个Callback
	// **************************************************************
	return template.execute(callback, b);
}

(5). StringRedisTemplate

// 1. execute
public <T> T execute(RedisCallback<T> action, boolean exposeConnection) {
	return execute(action, exposeConnection, false);
} 


public <T> T execute(
          //回调
          RedisCallback<T> action, 
		  // true
		  boolean exposeConnection, 
		  // false
		  boolean pipeline) {
	
	// 2. 获得RedisConnectionFactory
	RedisConnectionFactory factory = getRequiredConnectionFactory();
	RedisConnection conn = null;
	try {
		if (enableTransactionSupport) { //false
			// only bind resources in case of potential transaction synchronization
			conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
		} else {
			// 创建:RedisConnection
			conn = RedisConnectionUtils.getConnection(factory);
		}
		
		// false
		boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
		// Spring留出来的,前置回调
		RedisConnection connToUse = preProcessConnection(conn, existingConnection);
		
		// 是否pipline
		// false
		boolean pipelineStatus = connToUse.isPipelined();
		if (pipeline && !pipelineStatus) {
			connToUse.openPipeline();
		}
		
		// exposeConnection : true,所以,直接用上面的连接:RedisConnection
		// 先不管,为什么要创建:Proxy了
		RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
		
		// *************************************************************
		// 调用上面第4步(DefaultValueOperations.set)创建的:RedisCallback(ValueDeserializingRedisCallback为实现类),并传递一个:RedisConnection
		// *************************************************************
		T result = action.doInRedis(connToExpose);
		
		// 针对pipeline的处理.
		// close pipeline
		if (pipeline && !pipelineStatus) {
			connToUse.closePipeline();
		}
		
		// Spring留出来的后置处理.
		// TODO: any other connection processing?
		return postProcessResult(result, connToUse, existingConnection);
	} finally {
		// 释放conn
		RedisConnectionUtils.releaseConnection(conn, factory);
	}
} // end execute

(6). ValueDeserializingRedisCallback

abstract class AbstractOperations<K, V> {
	abstract class ValueDeserializingRedisCallback implements RedisCallback<V> {
		private Object key;

		public ValueDeserializingRedisCallback(Object key) {
			this.key = key;
		}
		
		// 1. 调用:doInRedis
		public final V doInRedis(RedisConnection connection) {
			// ************************************************************
			// 2. rawKey方法,对key进行序列化.  
			// 3. 把RedisConnection和rawKey交给子类(DefaultValueOperations.set创建的回调)
			// ************************************************************
			byte[] result = inRedis(rawKey(key), connection);
			return deserializeValue(result);
		}

		@Nullable
		protected abstract byte[] inRedis(byte[] rawKey, RedisConnection connection);
	}
}

(7). DefaultValueOperations

public void set(K key, V value) {
	byte[] rawValue = rawValue(value);
	execute(
	   
	   new ValueDeserializingRedisCallback(key) {
		// ***************************************************************
		// 1. ValueDeserializingRedisCallback回调inRedis
		// ***************************************************************
		@Override
		protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
			// 2. 最终是委托给了:RedisConnection
			// org.springframework.data.redis.connection.DefaultStringRedisConnection
			connection.set(rawKey, rawValue);
			return null;
		}
	}, true);
}

(8). DefaultStringRedisConnection

public Boolean set(byte[] key, byte[] value) {
	// ******************************************************************
	// 1. 委托给:JedisClusterConnection.set方法
	//    JedisClusterConnection应该不陌生吧,在上一章里,有遇到过
	// ******************************************************************
	return convertAndReturn(delegate.set(key, value), identityConverter);
}

(9). JedisClusterConnection

default Boolean set(byte[] key, byte[] value) {
	// ****************************************
	// 1. 创建:JedisClusterStringCommands
	// 2. 调用:JedisClusterStringCommands.set
	// ****************************************
	return stringCommands().set(key, value);
}

public RedisStringCommands stringCommands() {
	return new JedisClusterStringCommands(this);
}

(10). JedisClusterStringCommands

public Boolean set(byte[] key, byte[] value) {
	Assert.notNull(value, "Value must not be null!");
	try {
		// 1. JedisClusterConnection.set
		return Converters.stringToBoolean(connection.getCluster().set(key, value));
	} catch (Exception ex) {
		throw convertJedisAccessException(ex);
	}
}

(11). BinaryJedisCluster

public String set(final byte[] key, final byte[] value) {
	// 创建JedisClusterCommand,并调用:runBinary方法
	return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
	  @Override
	  public String execute(Jedis connection) {
		return connection.set(key, value);
	  }
	}.runBinary(key);
}

(12). JedisClusterCommand

public T runBinary(byte[] key) {
	if (key == null) {
	throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
	}
	
	// **************************************************
	// runWithRetries
	// **************************************************
	return runWithRetries(key, this.maxAttempts, false, false);
}// end runBinary

private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
	if (attempts <= 0) {
	  throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
	}

	Jedis connection = null;
	try {

	  if (asking) {
		// TODO: Pipeline asking with the original command to make it
		// faster....
		connection = askConnection.get();
		connection.asking();

		// if asking success, reset asking flag
		asking = false;
	  } else {
		if (tryRandomNode) {
		  connection = connectionHandler.getConnection();
		} else {
			// ******************************************************
			// 1. 对key进行crc16算法
			// 2. 委托给:JedisSlotBasedConnectionHandler类的getConnectionFromSlot,根据slot获得对应的:Jedis
			// ******************************************************
		  connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
		}
	  }

	  return execute(connection);

	} catch (JedisNoReachableClusterNodeException jnrcne) {
	  throw jnrcne;
	} catch (JedisConnectionException jce) {    // 连接异常时,进行重试,最大重试次数为:5
	  // release current connection before recursion
	  releaseConnection(connection);
	  connection = null;

	  if (attempts <= 1) {
		this.connectionHandler.renewSlotCache();
		throw jce;
	  }
	  return runWithRetries(key, attempts - 1, tryRandomNode, asking);
	} catch (JedisRedirectionException jre) {   // 重定向时处理.
	  // if MOVED redirection occurred,
	  if (jre instanceof JedisMovedDataException) {
		this.connectionHandler.renewSlotCache(connection);
	  }

	  // release current connection before recursion or renewing
	  releaseConnection(connection);
	  connection = null;

	  if (jre instanceof JedisAskDataException) {
		asking = true;
		askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
	  } else if (jre instanceof JedisMovedDataException) {
	  } else {
		throw new JedisClusterException(jre);
	  }
	  return runWithRetries(key, attempts - 1, false, asking);
	} finally {
	  releaseConnection(connection);
	}
}

(13). JedisClusterCRC16

// 在集群模式下,如果key包含有花括号,那么会根据花括号的内容计算,而不是整个key.
public static int getSlot(byte[] key) {
	int s = -1;
	int e = -1;
	boolean sFound = false;
	for (int i = 0; i < key.length; i++) {
	  if (key[i] == '{' && !sFound) {
		s = i;
		sFound = true;
	  }
	  if (key[i] == '}' && sFound) {
		e = i;
		break;
	  }
	}
	if (s > -1 && e > -1 && e != s + 1) {
	  return getCRC16(key, s + 1, e) & (16384 - 1);
	}
	return getCRC16(key) & (16384 - 1);
}

(14). JedisSlotBasedConnectionHandler

public Jedis getConnectionFromSlot(int slot) {
	// 委托给:JedisClusterInfoCache
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      return connectionPool.getResource();
    } else {
      renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
      connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {
        return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();
      }
    }
}

(15). JedisClusterInfoCache

// ***************************************************************
// 这个类就是上一节,剖析的重点,在启动时,会把所有的slot进行初始化.
// map的key就是:slot
// map的value就是:JedisPool
// ***************************************************************
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
public JedisPool getSlotPool(int slot) {
    r.lock();
    try {
      return slots.get(slot);
    } finally {
      r.unlock();
    }
}

(16). 总结

1) Jedis启动时,会初始化slot与JedisPool(通过Map保存着).
2) Jedis在保存数据时(set),会先对key进行CRC16并与16384进行运算,得出slot,通过slot获得:JedisPool.
3) Jedis在根据slot获得:Jedis实例.
4) 在获取实例时,如果有抛出:JedisConnectionException或JedisRedirectionException时,会触发:JedisClusterInfoCache去与Redis同步(CLUSTER SLOTS)slot信息.