(1). 概述

在这一小篇主要剖析:CommandApiRequestHandler,它是RequestHandler的实现,主要用于接受请求,并做业务处理.

(2). GatewayBrokerTransportStep.startServerTransport

private void startServerTransport(
      final BrokerStartupContext brokerStartupContext,
      final ActorFuture<BrokerStartupContext> startupFuture) {

    final var concurrencyControl = brokerStartupContext.getConcurrencyControl();
    final var brokerInfo = brokerStartupContext.getBrokerInfo();
    final var schedulingService = brokerStartupContext.getActorSchedulingService();
    final var messagingService = brokerStartupContext.getApiMessagingService();

    // **********************************************************************
	// 1. 创建:AtomixServerTransport
	// **********************************************************************
    final var atomixServerTransport = new AtomixServerTransport(brokerInfo.getNodeId(), messagingService);
}

(3). AtomixServerTransport.subscribe

配置topic的相应处理.

// 此处RequestHandler为:CommandApiRequestHandler
public ActorFuture<Void> subscribe(final int partitionId, final RequestType requestType, final RequestHandler requestHandler) {
    return actor.call(
        () -> {
          final var topicName = topicName(partitionId, requestType);
          LOG.trace("Subscribe for topic {}", topicName);
          partitionsRequestMap.computeIfAbsent(partitionId, id -> new Long2ObjectHashMap<>());
		 
		 // ********************************************************************  
		 // 2. 调用NettyMessagingService.registerHandler配置处理器
		 // ********************************************************************  
          messagingService.registerHandler(
              topicName,
              (sender, request) ->
                  handleAtomixRequest(request, partitionId, requestType, requestHandler));
        }
		
		// 以下代码为上面相应代码的写法.
		// messagingService.registerHandler(
		//     topicName,
		//     new BiFunction<Address, byte[], CompletableFuture<byte[]>>(){
		//       public CompletableFuture<byte[]> apply( Address sender, byte[] request) {
		//	      // ... ...
		//       } 
		//    }
		// );
    ); // end return
}// end subscribe

(4). NettyMessagingService.registerHandler

public void registerHandler(final String type, final BiFunction<Address, byte[], CompletableFuture<byte[]>> handler) {
	handlers.register(type,
	     // *******************************************************************
	     // 4. handler == CommandApiRequestHandler
	     //    handler.apply方法最终是调用上一步的:new BiFunction<Address, byte[], CompletableFuture<byte[]>>方法. 
		 //    在这里又包了一层,这代码相当于这样写.
		 //    new BiConsumer<ProtocolRequest, ServerConnection>(){
		 //	      public void accept(ProtocolRequest request, ServerConnection connection){
		 //          new BiFunction<Address, byte[], CompletableFuture<byte[]>>(){
		 //              public CompletableFuture<byte[]> apply( Address sender, byte[] request) {
		 //               	  handleAtomixRequest(... ...)
		 //              }
		 //          }
		 //       }
		 //   }
	     // *******************************************************************
		 (message, connection) -> 
           handler
		       .apply(message.sender(), message.payload())
		       .whenComplete(  // ... ... )
        // ... ...
    );		 
}	

(6). AtomixServerTransport.handleAtomixRequest

private CompletableFuture<byte[]> handleAtomixRequest(
      final byte[] requestBytes,
      final int partitionId,
      final RequestType requestType,
      final RequestHandler requestHandler) {
    final var completableFuture = new CompletableFuture<byte[]>();
    actor.call(
        () -> {
          final var requestId = requestCount.getAndIncrement();
          final var requestMap = partitionsRequestMap.get(partitionId);
          if (requestMap == null) {
            final var errorMsg = String.format(ERROR_MSG_MISSING_PARTITON_MAP, partitionId);
            LOG.trace(errorMsg);
            completableFuture.completeExceptionally(new IllegalStateException(errorMsg));
            return;
          }

          try {
			//   requestHandler为:CommandApiRequestHandler
            requestHandler.onRequest(
                this,
                partitionId,
                requestId,
                new UnsafeBuffer(requestBytes),
                0,
                requestBytes.length);
            if (LOG.isTraceEnabled()) {
              LOG.trace(
                  "Handled request {} for topic {}",
                  requestId,
                  topicName(partitionId, requestType));
            }
            // we only add the request to the map after successful handling
            requestMap.put(requestId, completableFuture);
          } catch (final Exception exception) {
            LOG.error(
                "Unexpected exception on handling request for partition {}.",
                partitionId,
                exception);
            completableFuture.completeExceptionally(exception);
          }
        });

    return completableFuture;
}

(5). CommandApiRequestHandler.handleExecuteCommandRequest

CommandApiRequestHandler.onRequest方法,最终是委托给了:handleExecuteCommandRequest方法.

private Either<ErrorResponseWriter, CommandApiResponseWriter> handleExecuteCommandRequest(
      final int partitionId,
      final long requestId,
      final CommandApiRequestReader reader,
      final CommandApiResponseWriter responseWriter,
      final ErrorResponseWriter errorWriter) {

    if (!isDiskSpaceAvailable) {
      return Either.left(errorWriter.outOfDiskSpace(partitionId));
    }

    final var command = reader.getMessageDecoder();
    final var logStreamWriter = leadingStreams.get(partitionId);
    final var limiter = partitionLimiters.get(partitionId);

    final var eventType = command.valueType();
    final var intent = Intent.fromProtocolValue(eventType, command.intent());
    final var event = reader.event();
    final var metadata = reader.metadata();

    metadata.requestId(requestId);
    metadata.requestStreamId(partitionId);
    metadata.recordType(RecordType.COMMAND);
    metadata.intent(intent);
    metadata.valueType(eventType);

    if (logStreamWriter == null) {
      errorWriter.partitionLeaderMismatch(partitionId);
      return Either.left(errorWriter);
    }

    if (event == null) {
      errorWriter.unsupportedMessage(
          eventType.name(), CommandApiRequestReader.RECORDS_BY_TYPE.keySet().toArray());
      return Either.left(errorWriter);
    }

    metrics.receivedRequest(partitionId);
    if (!limiter.tryAcquire(partitionId, requestId, intent)) {
      metrics.dropped(partitionId);
      LOG.trace(
          "Partition-{} receiving too many requests. Current limit {} inflight {}, dropping request {} from gateway",
          partitionId,
          limiter.getLimit(),
          limiter.getInflightCount(),
          requestId);
      errorWriter.resourceExhausted();
      return Either.left(errorWriter);
    }

    boolean written = false;
    try {
      // *************************************************************************
	  // 最终是写了日志流
	  // *************************************************************************
      written = writeCommand(command.key(), metadata, event, logStreamWriter);
      return Either.right(responseWriter);
    } catch (final Exception ex) {
      LOG.error("Unexpected error on writing {} command", intent, ex);
      errorWriter.internalError("Failed writing response: %s", ex);
      return Either.left(errorWriter);
    } finally {
      if (!written) {
        limiter.onIgnore(partitionId, requestId);
      }
    }
} // end 

(6). RequestHandler

我们看下RequestHandler接口的签名,大概也就能理解它的职责了.

public interface RequestHandler {
	
	void onRequest(
	      ServerOutput serverOutput,
	      int partitionId,
	      long requestId,
	      DirectBuffer buffer,
	      int offset,
	      int length);
}	

(7). 总结

Zeebe充份利用了Java8的lambed语法,相对来说是有一点点绕,当然,也有可能是我的能力有限,一时半会没理解他的代码.