(1). 概述

在前面对StartupStep进行了剖析,这一小篇主要剖析:ClusterServicesStep中的一小部份,即:AtomixCluster创建过程的详解.

(2). ClusterServicesStep.startupInternal

void startupInternal(
      final BrokerStartupContext brokerStartupContext,
      final ConcurrencyControl concurrencyControl,
      final ActorFuture<BrokerStartupContext> startupFuture) {
	
	// ... ... 
	// ***********************************************************************
	// 运用静态工厂模式,构建:AtomixCluster
	// ***********************************************************************
	final var atomix =
	        AtomixClusterFactory.fromConfiguration(brokerStartupContext.getBrokerConfiguration());
	// ... ... 
}

(3). AtomixClusterFactory.fromConfiguration

public static AtomixCluster fromConfiguration(final BrokerCfg configuration) {
	// ... ... 
	var atomixBuilder  = new AtomixClusterBuilder(new ClusterConfig());
	
	// ... ...
	return atomixBuilder.build();
}	

(4). AtomixCluster.build

public AtomixCluster build() {
	return new AtomixCluster(config, Version.from(VersionUtil.getVersion()));
}

(5). AtomixCluster构建器

public AtomixCluster(final ClusterConfig config, final Version version) {
	// *****************************************************************
	// 在这一篇,主要剖析:buildMessagingService方法
	//   下一篇,再剖析:buildUnicastService方法
	// *****************************************************************
    this(config, version, buildMessagingService(config), buildUnicastService(config));
}

(6). AtomixCluster.buildMessagingService

protected static ManagedMessagingService buildMessagingService(final ClusterConfig config) {
	// config.getNodeConfig().getAddress() = 26502
    return new NettyMessagingService(config.getClusterId(), config.getNodeConfig().getAddress(), config.getMessagingConfig());
}

(7). NettyMessagingService构建器

public NettyMessagingService(
      final String cluster, final Address advertisedAddress, final MessagingConfig config) {
    // 调用下面这个要构造器的方法.
    this(cluster, advertisedAddress, config, ProtocolVersion.latest());
}

NettyMessagingService(
  final String cluster,
  final Address advertisedAddress,
  final MessagingConfig config,
  final ProtocolVersion protocolVersion) {
	preamble = cluster.hashCode();
	this.advertisedAddress = advertisedAddress;
	this.protocolVersion = protocolVersion;
	this.config = config;

	openFutures = new CopyOnWriteArrayList<>();
	// *************************************************************************
	// ChannelPool看这个类名称,大概就知道应该是与Netty里的Channel相关,并且是一个池子.
	// *************************************************************************
	channelPool = new ChannelPool(this::openChannel, config.getConnectionPoolSize());
	// ... ...
}

(8). NettyMessagingService.openChannel

private CompletableFuture<Channel> openChannel(final Address address) {
	return bootstrapClient(address);
}

(9). NettyMessagingService.bootstrapClient

从这个名称上来看,是client端的Channel,这个client的Channel要等待到发送请求(部署流程或其它)时才会调用一次.

private CompletableFuture<Channel> bootstrapClient(final Address address) {
	// address:0.0.0.0:26501
    final CompletableFuture<Channel> future = new OrderedFuture<>();
    final InetAddress resolvedAddress = address.address(true);
    if (resolvedAddress == null) {
      future.completeExceptionally(new IllegalStateException("Failed to bootstrap client (address " + address.toString() + " cannot be resolved)"));
      return future;
    }

    final Bootstrap bootstrap = new Bootstrap();
    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    bootstrap.option( ChannelOption.WRITE_BUFFER_WATER_MARK,
        new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
    bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024);
    bootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024);
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
    bootstrap.group(clientGroup);
    bootstrap.channel(clientChannelClass);
	// 0.0.0.0:26501
    bootstrap.remoteAddress(resolvedAddress, address.port());
	// ********************************************************************
	// Client Channel初始化配置
	// ********************************************************************
    bootstrap.handler(new BasicClientChannelInitializer(future));
    final Channel channel =
        bootstrap
            .connect()
            .addListener(
                onConnect -> {
                  if (!onConnect.isSuccess()) {
                    future.completeExceptionally(
                        new ConnectException(
                            String.format("Failed to connect channel for address %s", address)));
                  }
                })
            .channel();

    
    channel
        .closeFuture()
        .addListener(
            onClose ->
                future.completeExceptionally(
                    new ConnectException(
                        String.format(
                            "Channel %s for address %s was closed unexpectedly before the request was handled",
                            channel, address))));

    return future;
}

(10). NettyMessagingService$BasicClientChannelInitializer

private class BasicClientChannelInitializer extends ChannelInitializer<SocketChannel> {
	@Override
	protected void initChannel(final SocketChannel channel) {
	  // ... ... 
      // *********************************************************************
	  // 握手协议
	  // *********************************************************************
	  channel.pipeline().addLast("handshake", new ClientHandshakeHandlerAdapter(future));
	  // ... ... 
	}
}

(11). NettyMessagingService$ClientHandshakeHandlerAdapter

private class ClientHandshakeHandlerAdapter extends HandshakeHandlerAdapter<ProtocolReply> {

    private final CompletableFuture<Channel> future;

    ClientHandshakeHandlerAdapter(final CompletableFuture<Channel> future) {
      this.future = future;
    }

    @Override
    public void channelActive(final ChannelHandlerContext context) throws Exception {
      log.debug(
          "Writing client protocol version {} for connection to {}",
          protocolVersion,
          context.channel().remoteAddress());
	  // ***************************************************************
	  // 当client Channel初始化状态时,发送出一个握手协议信息.
	  // ***************************************************************
      writeProtocolVersion(context, protocolVersion);
    }
	
	// 写出一个协议信息
	// void writeProtocolVersion(final ChannelHandlerContext context, final ProtocolVersion version) {
	//      final ByteBuf buffer = context.alloc().buffer(6);
	//      buffer.writeInt(preamble);
	//      buffer.writeShort(version.version());
	//      context.writeAndFlush(buffer);
	// }
	

    @Override
    public void channelRead(final ChannelHandlerContext context, final Object message)
        throws Exception {
	   // 读取服务端的协议信息	
      // Read the protocol version from the server.
      readProtocolVersion(context, (ByteBuf) message)
          .ifPresent(
              version -> {
				 // 先确保协议信息是正确的.
                final ProtocolVersion protocolVersion = ProtocolVersion.valueOf(version);
                if (protocolVersion != null) {
				  // **************************************************
				  // 创建连接,并配置:pipeline
				  // **************************************************
                  activateProtocolVersion(context, getOrCreateClientConnection(context.channel()), protocolVersion);
                } else {
                  log.error("Failed to negotiate protocol version");
                  context.close();
                }
              });
    }
}

(12). NettyMessagingService.activateProtocolVersion

void activateProtocolVersion(
        final ChannelHandlerContext context,
        final Connection<M> connection,
        final ProtocolVersion protocolVersion) {
	  // **************************************************************************		
	  // 重点:编码和解码
	  // **************************************************************************		
      final MessagingProtocol protocol = protocolVersion.createProtocol(advertisedAddress);
	  // 从当前的Pipeline中移除编解码顺(NettyMessagingService$ClientHandshakeHandlerAdapter)
      context.pipeline().remove(this);
      context.pipeline().addLast("encoder", protocol.newEncoder());
      context.pipeline().addLast("decoder", protocol.newDecoder());
      context.pipeline().addLast("handler", new MessageDispatcher<>(connection));
}

(13). NettyMessagingService.getOrCreateClientConnection

// ... ...
private final Map<Channel, RemoteClientConnection> connections = Maps.newConcurrentMap();
// ... ...

private RemoteClientConnection getOrCreateClientConnection(final Channel channel) {
    RemoteClientConnection connection = connections.get(channel);
    if (connection == null) {
	  // ***********************************************************************
	  //  通过:RemoteClientConnection包裹着:Channel(Client)
	  // ***********************************************************************
      connection = connections.computeIfAbsent(channel, RemoteClientConnection::new);
      channel
          .closeFuture()
          .addListener(
              f -> {
                final RemoteClientConnection removedConnection = connections.remove(channel);
                if (removedConnection != null) {
                  removedConnection.close();
                }
              });
    }
    return connection;
}

(14). ChannelPool

class ChannelPool {
	// 从方法签名上就能看出来:是根据address创建:Channel
	CompletableFuture<Channel> getChannel(final Address address, final String messageType);
}

(15). 总结

分析了大半天,NettyMessagingService的构造器在初始化时,最重要的是创建了:ChannelPool,从名称上就能看出来,它是一个Netty中Channel的池子.