(1). 概述

在前面,剖析到StartupProcess会持有一堆StartupStep,那么,StartupStep是什么呢?它的作用是什么呢?在这一小篇主要对StartupStep(ApiMessagingServiceStep)进行剖析.

(2). 查看ApiMessagingServiceStep的继承关系

io.camunda.zeebe.util.startup.StartupStep
    io.camunda.zeebe.broker.bootstrap.AbstractBrokerStartupStep
	    io.camunda.zeebe.broker.bootstrap.ApiMessagingServiceStep

(3). 接口StartupStep

看接口,大概能知道这个接口,主要是负责启动和关闭,而,StartupProcess负责调度所有的:StartupStep.

public interface StartupStep<CONTEXT> {
	String getName();
	
	// 启动
	ActorFuture<CONTEXT> startup(final CONTEXT context);
	
	// 关闭
	ActorFuture<CONTEXT> shutdown(final CONTEXT context);
}	

(4). 抽象类(AbstractBrokerStartupStep)

// ***************************************************************
// AbstractBrokerStartupStep指定了具体的:CONTEXT,它的类型为:BrokerStartupContext
// 我们稍微看一下:BrokerStartupContext是什么
// ***************************************************************
abstract class AbstractBrokerStartupStep implements StartupStep<BrokerStartupContext> {
	
	// 预留两个方法,给子类(比如:ApiMessagingServiceStep)去实现. 
	// ... ... 
	abstract void startupInternal(
	      final BrokerStartupContext brokerStartupContext,
	      final ConcurrencyControl concurrencyControl,
	      final ActorFuture<BrokerStartupContext> startupFuture);
	
	  abstract void shutdownInternal(
	      final BrokerStartupContext brokerShutdownContext,
	      final ConcurrencyControl concurrencyControl,
	      final ActorFuture<BrokerStartupContext> shutdownFuture);
}	

(5). BrokerStartupContext

有一个大胆的猜测,StartupStep的实现类,实际是对:BrokerStartupContext的填充(调用相应的set方法).

public interface BrokerStartupContext {

  BrokerInfo getBrokerInfo();

  BrokerCfg getBrokerConfiguration();

  SpringBrokerBridge getSpringBrokerBridge();

  ActorSchedulingService getActorSchedulingService();

  @Deprecated // use getActorSchedulingService instead
  ActorScheduler getActorScheduler();

  ConcurrencyControl getConcurrencyControl();

  BrokerHealthCheckService getHealthCheckService();

  void addPartitionListener(PartitionListener partitionListener);

  void removePartitionListener(PartitionListener partitionListener);

  List<PartitionListener> getPartitionListeners();

  ClusterServicesImpl getClusterServices();

  void setClusterServices(ClusterServicesImpl o);

  void addDiskSpaceUsageListener(DiskSpaceUsageListener listener);

  void removeDiskSpaceUsageListener(DiskSpaceUsageListener listener);

  CommandApiServiceImpl getCommandApiService();

  void setCommandApiService(CommandApiServiceImpl commandApiService);

  AdminApiRequestHandler getAdminApiService();

  void setAdminApiService(AdminApiRequestHandler adminApiService);

  AtomixServerTransport getCommandApiServerTransport();

  void setCommandApiServerTransport(AtomixServerTransport commandApiServerTransport);

  ManagedMessagingService getApiMessagingService();

  void setApiMessagingService(ManagedMessagingService commandApiMessagingService);

  SubscriptionApiCommandMessageHandlerService getSubscriptionApiService();

  void setSubscriptionApiService(
      SubscriptionApiCommandMessageHandlerService subscriptionApiService);

  EmbeddedGatewayService getEmbeddedGatewayService();

  void setEmbeddedGatewayService(EmbeddedGatewayService embeddedGatewayService);

  DiskSpaceUsageMonitor getDiskSpaceUsageMonitor();

  void setDiskSpaceUsageMonitor(DiskSpaceUsageMonitor diskSpaceUsageMonitor);

  LeaderManagementRequestHandler getLeaderManagementRequestHandler();

  void setLeaderManagementRequestHandler(final LeaderManagementRequestHandler handler);

  ExporterRepository getExporterRepository();

  PartitionManagerImpl getPartitionManager();

  void setPartitionManager(PartitionManagerImpl partitionManager);

  BrokerAdminServiceImpl getBrokerAdminService();

  void setBrokerAdminService(final BrokerAdminServiceImpl brokerAdminService);
}

(6). ApiMessagingServiceStep

public class ApiMessagingServiceStep extends AbstractBrokerStartupStep {
  private static final Logger LOG = Loggers.SYSTEM_LOGGER;

  @Override
  void startupInternal(
      final BrokerStartupContext brokerStartupContext,
      final ConcurrencyControl concurrencyControl,
      final ActorFuture<BrokerStartupContext> startupFuture) {
    final var brokerCfg = brokerStartupContext.getBrokerConfiguration();
    final var commandApiCfg = brokerCfg.getNetwork().getCommandApi();
    final var securityCfg = brokerCfg.getNetwork().getSecurity();

    final var messagingConfig = new MessagingConfig();
    messagingConfig.setInterfaces(List.of(commandApiCfg.getHost()));
    messagingConfig.setPort(commandApiCfg.getPort());

    if (securityCfg.isEnabled()) {
      messagingConfig
          .setTlsEnabled(true)
          .setCertificateChain(securityCfg.getCertificateChainPath())
          .setPrivateKey(securityCfg.getPrivateKeyPath());
    }

    messagingConfig.setCompressionAlgorithm(brokerCfg.getCluster().getMessageCompression());
    
	// ***************************************************************
	// 暂且不看:NettyMessagingService的内容,从名称上,是不是能看出来一点什么?  
	// NettyMessagingService肯定是跟网络(会绑定某个端口或跟某个端口通信)相关了.  
	// ***************************************************************
    final var messagingService =
        new NettyMessagingService(
            brokerCfg.getCluster().getClusterName(),
			// 26501
            Address.from(commandApiCfg.getAdvertisedHost(), commandApiCfg.getAdvertisedPort()),
            messagingConfig);

    messagingService
        .start()
        .whenComplete(
            (createdMessagingService, error) -> {
              if (error != null) {
                startupFuture.completeExceptionally(error);
              } else {
                concurrencyControl.run(
                    () -> {
                      LOG.debug(
                          "Bound API to {}, using advertised address {} ",
                          messagingService.bindingAddresses(),
                          messagingService.address());
                      brokerStartupContext.setApiMessagingService(messagingService);
                      startupFuture.complete(brokerStartupContext);
                    });
              }
            });
  } // end startupInternal
}

(7). 总结

分析了半天,然后,StartupStep是一个生命周期管理系统,它最终的目标是填充BrokerStartupContext里所有的set方法.