(1). 概述

在前面的剖析,发现NettyMessagingService的构造器,主要是创建:ChannelPool,并没有什么特殊的东西,我们接着往下剖析,看下:NettyMessagingService内部到底干了什么?

(2). NettyMessagingService.start

public CompletableFuture<MessagingService> start() {
     // ... ... 
	 
	 // 初始化:EventLoopGroup
    initTransport();
	
	
    return serviceLoader
	     // ************************************************************* 
		 // 启动Server
		 // ************************************************************* 
        .thenCompose(ok -> bootstrapServer())
        .thenRun(
            () -> {
              timeoutExecutor =
                  Executors.newSingleThreadScheduledExecutor(
                      new DefaultThreadFactory("netty-messaging-timeout-"));
              localConnection = new LocalClientConnection(handlers);
              started.set(true);

              log.info(
                  "Started messaging service bound to {}, advertising {}, and using {}",
                  bindingAddresses,
                  advertisedAddress,
                  config.isTlsEnabled() ? "TLS" : "plaintext");
            })
        .thenApply(v -> this);
} // end


private void initTransport() {
	if (Epoll.isAvailable()) {
	  initEpollTransport();
	} else {
	  initNioTransport();
	}
} // end 


private void initEpollTransport() {
    clientGroup = new EpollEventLoopGroup(0, namedThreads("netty-messaging-event-epoll-client-%d", log));
    serverGroup = new EpollEventLoopGroup(0, namedThreads("netty-messaging-event-epoll-server-%d", log));
    serverChannelClass = EpollServerSocketChannel.class;
    clientChannelClass = EpollSocketChannel.class;
} //end 

private void initNioTransport() {
    clientGroup = new NioEventLoopGroup(0, namedThreads("netty-messaging-event-nio-client-%d", log));
    serverGroup = new NioEventLoopGroup(0, namedThreads("netty-messaging-event-nio-server-%d", log));
    serverChannelClass = NioServerSocketChannel.class;
    clientChannelClass = NioSocketChannel.class;
}// end 

(3). NettyMessagingService.bootstrapServer

private CompletableFuture<Void> bootstrapServer() {
    final ServerBootstrap b = new ServerBootstrap();
    b.option(ChannelOption.SO_REUSEADDR, true);
    b.option(ChannelOption.SO_BACKLOG, 128);
    b.childOption(
        ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
    b.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024);
    b.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024);
    b.childOption(ChannelOption.SO_KEEPALIVE, true);
    b.childOption(ChannelOption.TCP_NODELAY, true);
    b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    b.group(serverGroup, clientGroup);
    b.channel(serverChannelClass);
	// ************************************************************************
	// 配置ChannelInitializer
	// ************************************************************************
    b.childHandler(new BasicServerChannelInitializer());
	
    return bind(b);
}

(4). NettyMessagingService$BasicServerChannelInitializer

private class BasicServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(final SocketChannel channel) {
      // ... ...
      // *************************************************************************
	  // 添加了握手协议
	  // *************************************************************************
      channel.pipeline().addLast("handshake", new ServerHandshakeHandlerAdapter());

      // ... ...
    } //end 
}

(5). NettyMessagingService$ServerHandshakeHandlerAdapter

private class ServerHandshakeHandlerAdapter extends HandshakeHandlerAdapter<ProtocolRequest> {

    @Override
    public void channelRead(final ChannelHandlerContext context, final Object message)
        throws Exception {
      // Read the protocol version from the client handshake. If the client's protocol version is
      // unknown
      // to the server, use the latest server protocol version.
      readProtocolVersion(context, (ByteBuf) message)
          .ifPresent(
              version -> {
                // ... ...
				// ***************************************************************
				// 
				// ***************************************************************
                activateProtocolVersion(
                    context,
                    new RemoteServerConnection(handlers, context.channel()),
                    protocolVersion);
              });
    }

    @Override
    void activateProtocolVersion(
        final ChannelHandlerContext context,
        final Connection<ProtocolRequest> connection,
        final ProtocolVersion protocolVersion) {
        // ... ... 
		// ***************************************************************
		// 
		// ***************************************************************
      super.activateProtocolVersion(context, connection, protocolVersion);
    }
  }

(6). NettyMessagingService.activateProtocolVersion

void activateProtocolVersion(
        final ChannelHandlerContext context,
        final Connection<M> connection,
        final ProtocolVersion protocolVersion) {
	  // *****************************************************************		
	  // 创建消息编码与解码处理器
	  // *****************************************************************		
      final MessagingProtocol protocol = protocolVersion.createProtocol(advertisedAddress);
	  // 从Pipeline移除当前的编解码器(NettyMessagingService$ServerHandshakeHandlerAdapter)
      context.pipeline().remove(this);
	  
      context.pipeline().addLast("encoder", protocol.newEncoder());
      context.pipeline().addLast("decoder", protocol.newDecoder());
      context.pipeline().addLast("handler", new MessageDispatcher<>(connection));
}

(8). 总结

NettyMessagingService的底层实际上是通过Netty监听一个端口(26502),通过这两个类进行编码和解码(MessageToByteEncoder/ByteToMessageDecoder),最终,会把请求移交给:MessageDispatcher进行处理,好像协议是自定义的:没有用PB或其它.