(1). 概述

在这一小篇,对GroupMembershipProtocol进行剖析.

(2). AtomixCluster.buildMembershipProtocol

protected static GroupMembershipProtocol buildMembershipProtocol(final ClusterConfig config) {
    return config.getProtocolConfig().getType().newProtocol(config.getProtocolConfig());
}

(3). SwimMembershipProtocol$Type.newProtocol

public class SwimMembershipProtocol
    extends AbstractListenerManager<GroupMembershipEvent, GroupMembershipEventListener>
    implements GroupMembershipProtocol {
		
	public static class Type implements GroupMembershipProtocol.Type<SwimMembershipProtocolConfig> {
		private static final String NAME = "swim";

		@Override
		public String name() {
		  return NAME;
		}

        // *****************************************************************
		// SwimMembershipProtocol$Type.newProtocol
		// *****************************************************************
		@Override
		public GroupMembershipProtocol newProtocol(final SwimMembershipProtocolConfig config) {
		  return new SwimMembershipProtocol(config);
		}
	  } // end SwimMembershipProtocol$Type
} 	  

(4). 分析SwimMembershipProtocol类的成员属性

public class SwimMembershipProtocol
    extends AbstractListenerManager<GroupMembershipEvent, GroupMembershipEventListener>
    implements GroupMembershipProtocol {

  public static final Type TYPE = new Type();
  private static final Logger LOGGER = LoggerFactory.getLogger(SwimMembershipProtocol.class);
  
  private static final String MEMBERSHIP_SYNC = "atomix-membership-sync";
  private static final String MEMBERSHIP_GOSSIP = "atomix-membership-gossip";
  private static final String MEMBERSHIP_PROBE = "atomix-membership-probe";
  private static final String MEMBERSHIP_PROBE_REQUEST = "atomix-membership-probe-request";
  
  // 序列化
  private static final Serializer SERIALIZER =
      Serializer.using(
          new Namespace.Builder()
              .register(Namespaces.BASIC)
              .nextId(Namespaces.BEGIN_USER_CUSTOM_ID)
              .register(MemberId.class)
              .register(new AddressSerializer(), Address.class)
              .register(ImmutableMember.class)
              .register(State.class)
              .register(ImmutablePair.class)
              .name("ClusterMembershipService")
              .build());

  // 
  private final SwimMembershipProtocolConfig config;
  private final AtomicBoolean started = new AtomicBoolean();
  
  private final Map<MemberId, SwimMember> members = Maps.newConcurrentMap();
  private final List<SwimMember> randomMembers = Lists.newCopyOnWriteArrayList();
  private final Map<MemberId, ImmutableMember> updates = new LinkedHashMap<>();
  private final List<SwimMember> syncMembers = new ArrayList<>();
  
  private final ScheduledExecutorService swimScheduler =
      Executors.newSingleThreadScheduledExecutor(
          namedThreads("atomix-cluster-heartbeat-sender", LOGGER));
  private final ExecutorService eventExecutor =
      Executors.newSingleThreadExecutor(namedThreads("atomix-cluster-events", LOGGER));
  private final AtomicInteger probeCounter = new AtomicInteger();
  
  // *********************************************************************
  // NodeDiscoveryService底层委托给了:NodeDiscoveryProvider
  // NodeDiscoveryProvider的内容,上一篇有详细讲解
  // *********************************************************************
  private NodeDiscoveryService discoveryService;
  
  // *********************************************************************
  // BootstrapService实际是:AtomixClusterImpl
  // *********************************************************************
  private BootstrapService bootstrapService;
  private SwimMember localMember;
  
  // *********************************************************************
  // 节点发现事件监听器
  // *********************************************************************
  private final NodeDiscoveryEventListener discoveryEventListener = this::handleDiscoveryEvent;
  
  // *********************************************************************
  // 探测请求处理/同步/探测/goossip
  // *********************************************************************
  private final BiFunction<Address, byte[], CompletableFuture<byte[]>> probeRequestHandler =
      (address, payload) ->
          handleProbeRequest(SERIALIZER.decode(payload)).thenApply(SERIALIZER::encode);
  private final BiFunction<Address, byte[], byte[]> syncHandler =
      (address, payload) -> SERIALIZER.encode(handleSync(SERIALIZER.decode(payload)));
  private final BiFunction<Address, byte[], byte[]> probeHandler =
      (address, payload) -> SERIALIZER.encode(handleProbe(SERIALIZER.decode(payload)));
  private final BiConsumer<Address, byte[]> gossipListener =
      (address, payload) -> handleGossipUpdates(SERIALIZER.decode(payload));
  // ... ...
}  

(5). SwimMembershipProtocol.handleDiscoveryEvent

private void handleDiscoveryEvent(final NodeDiscoveryEvent event) {
    switch (event.type()) {
      case JOIN:
	    // ************************************************************
		// 处理节点加入集群
		// ************************************************************
        handleJoinEvent(event.subject());
        break;
      case LEAVE:
	    // ************************************************************
		// 处理节点离开集群
	    // ************************************************************
        handleLeaveEvent(event.subject());
        break;
      default:
        throw new AssertionError();
    } // end switch
}

(6). SwimMembershipProtocol.join

public CompletableFuture<Void> join(
      final BootstrapService bootstrap, final NodeDiscoveryService discovery, final Member member) {
    if (started.compareAndSet(false, true)) {
      bootstrapService = bootstrap;
      discoveryService = discovery;
	  
	  // 构建本地成员
      localMember =
          new SwimMember(
              member.id(),
              member.address(),
              member.zone(),
              member.rack(),
              member.host(),
              member.properties(),
              member.version(),
              System.currentTimeMillis());
      localProperties.putAll(localMember.properties());
	  
	  // 添加监听
      discoveryService.addListener(discoveryEventListener);

      // 设置本地成员状态为:激活
      localMember.setState(State.ALIVE);
	  
	  // 添加本地成员到成员列表集合里
      members.put(localMember.id(), localMember);
	  
	  // 触发成员添加事件
      post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, localMember));

      LOGGER.debug("Nodes from discovery service {}", discoveryService.getNodes());
	  
	  
      registerHandlers();
      
	  // **********************************************************
	  // 以下三个都是开启定时任务,发送TCP/UDP请求.更新成员属性信息.
	  // **********************************************************
      scheduleGossip();
      scheduleProbe();
      scheduleSync();

      LOGGER.info("Started");
    }
    return CompletableFuture.completedFuture(null);
} //end join


private void registerHandlers() {
	// ****************************************************
	// SWIM与Gossip还是有一些不一样,既用了TCP还用了UDP协议
	// ****************************************************
	// 为TCP配置Handler(成员通步/成员探测/成员探测请求)
    // Register TCP message handlers.
    bootstrapService
        .getMessagingService()
        .registerHandler(MEMBERSHIP_SYNC, syncHandler, swimScheduler);
    bootstrapService
        .getMessagingService()
        .registerHandler(MEMBERSHIP_PROBE, probeHandler, swimScheduler);
    bootstrapService
        .getMessagingService()
        .registerHandler(MEMBERSHIP_PROBE_REQUEST, probeRequestHandler);

    // 为UDP配置Handler(Gossip)
	// Register UDP message listeners.
    bootstrapService
        .getUnicastService()
        .addListener(MEMBERSHIP_GOSSIP, gossipListener, swimScheduler);
} // end registerHandlers

(7). 总结

SwimMembershipProtocol的主要职责是:更新成员列表/根据成员ID获得成员详细信息/加入集群/离开集群,其实,内部又委托给了:MessagingService进行通信.