(1). 概述
在这里,对ZeebeClient的源码进行剖析,首先需要了解下,ZeebeClient底层是GRpc通信(PB+Netty+Http2).
(2). ZeebeClient类图
(3). ZeebeClient源码入口
ZeebeClient.newClientBuilder().gatewayAddress(envVarAddress);
(4). ZeebeClient
static ZeebeClientBuilder newClientBuilder() {
return new ZeebeClientBuilderImpl();
}
(5). ZeebeClientBuilderImpl
public final class ZeebeClientBuilderImpl implements ZeebeClientBuilder, ZeebeClientConfiguration {
public static final String PLAINTEXT_CONNECTION_VAR = "ZEEBE_INSECURE_CONNECTION";
public static final String CA_CERTIFICATE_VAR = "ZEEBE_CA_CERTIFICATE_PATH";
public static final String KEEP_ALIVE_VAR = "ZEEBE_KEEP_ALIVE";
public static final String DEFAULT_GATEWAY_ADDRESS = "0.0.0.0:26500";
private final List<ClientInterceptor> interceptors = new ArrayList<>();
private String gatewayAddress = DEFAULT_GATEWAY_ADDRESS;
private int jobWorkerMaxJobsActive = 32;
private int numJobWorkerExecutionThreads = 1;
private String defaultJobWorkerName = "default";
private Duration defaultJobTimeout = Duration.ofMinutes(5);
private Duration defaultJobPollInterval = Duration.ofMillis(100);
private Duration defaultMessageTimeToLive = Duration.ofHours(1);
private Duration defaultRequestTimeout = Duration.ofSeconds(10);
private boolean usePlaintextConnection = false;
private String certificatePath;
private CredentialsProvider credentialsProvider;
private Duration keepAlive = Duration.ofSeconds(45);
private JsonMapper jsonMapper = new ZeebeObjectMapper();
public ZeebeClient build() {
applyOverrides();
applyDefaults();
// *********************************************************************
// 创建ZeebeClientImpl,传递ZeebeClientConfiguration
// *********************************************************************
return new ZeebeClientImpl(this);
} // end build
}
(6). ZeebeClientImpl构建器
// 1. 构造器委托
public ZeebeClientImpl(final ZeebeClientConfiguration configuration) {
// ******************************************************************
// 构建GRPC中的ManagedChannel
// ******************************************************************
this(configuration, buildChannel(configuration));
}
// 2. 构造器委托
public ZeebeClientImpl(
final ZeebeClientConfiguration configuration, final ManagedChannel channel) {
// ******************************************************************
// 构建GRPC中的Stub
// ******************************************************************
this(configuration, channel, buildGatewayStub(channel, configuration));
}
// 3. 构造器委托
public ZeebeClientImpl(
final ZeebeClientConfiguration configuration,
final ManagedChannel channel,
final GatewayStub gatewayStub) {
// ******************************************************************
// 构建线程池
// ******************************************************************
this(configuration, channel, gatewayStub, buildExecutorService(configuration));
}
// 4. 构造器委托
public ZeebeClientImpl(
final ZeebeClientConfiguration config,
final ManagedChannel channel,
final GatewayStub gatewayStub,
final ScheduledExecutorService executorService) {
this.config = config;
this.jsonMapper = config.getJsonMapper();
this.channel = channel;
asyncStub = gatewayStub;
this.executorService = executorService;
if (config.getCredentialsProvider() != null) {
credentialsProvider = config.getCredentialsProvider();
} else {
credentialsProvider = new NoopCredentialsProvider();
}
// ******************************************************************
// 构建JobClient
// ******************************************************************
jobClient = newJobClient();
}
(7). ZeebeClientImpl.buildChannel
public static ManagedChannel buildChannel(final ZeebeClientConfiguration config) {
final URI address;
try {
address = new URI("zb://" + config.getGatewayAddress());
} catch (final URISyntaxException e) {
throw new RuntimeException("Failed to parse broker contact point", e);
}
// *************************************************************
// GRPC中创建Channel
// *************************************************************
final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(address.getHost(), address.getPort());
configureConnectionSecurity(config, channelBuilder);
channelBuilder.keepAliveTime(config.getKeepAlive().toMillis(), TimeUnit.MILLISECONDS);
channelBuilder.userAgent("zeebe-client-java/" + VersionUtil.getVersion());
return channelBuilder.build();
}
(8). ZeebeClientImpl.buildGatewayStub
public static GatewayStub buildGatewayStub(
final ManagedChannel channel, final ZeebeClientConfiguration config) {
final CallCredentials credentials = buildCallCredentials(config);
// GRPC通过PB生成的:GatewayStub
final GatewayStub gatewayStub = GatewayGrpc.newStub(channel).withCallCredentials(credentials);
if (!config.getInterceptors().isEmpty()) {
return gatewayStub.withInterceptors(
config.getInterceptors().toArray(new ClientInterceptor[] {}));
}
return gatewayStub;
}
(9). ZeebeClientImpl.buildExecutorService
private static ScheduledExecutorService buildExecutorService(
final ZeebeClientConfiguration configuration) {
// JobWorker线程数
final int threadCount = configuration.getNumJobWorkerExecutionThreads();
return Executors.newScheduledThreadPool(threadCount);
}
(10). ZeebeClientImpl.newJobClient
private JobClient newJobClient() {
// 包裹:GatewayStub,GatewayStub是GRPC通信的核心
return new JobClientImpl(asyncStub, config, jsonMapper, credentialsProvider::shouldRetryRequest);
}
(11). ZeebeClient接口扫盲
为什么在这里要稍微扫盲一下ZeebeClient,原因在于:ZeebeClient代表着Client拥有哪些能力.
public interface ZeebeClient extends AutoCloseable, JobClient {
TopologyRequestStep1 newTopologyRequest();
ZeebeClientConfiguration getConfiguration();
// 创建流程部署命令
DeployProcessCommandStep1 newDeployCommand();
// 创建新的流程实例命令
CreateProcessInstanceCommandStep1 newCreateInstanceCommand();
// 创建流程实例取消命令
CancelProcessInstanceCommandStep1 newCancelInstanceCommand(long processInstanceKey);
// 为某个流程,设置变量命令
SetVariablesCommandStep1 newSetVariablesCommand(long elementInstanceKey);
// 发布消息命令
PublishMessageCommandStep1 newPublishMessageCommand();
ResolveIncidentCommandStep1 newResolveIncidentCommand(long incidentKey);
UpdateRetriesJobCommandStep1 newUpdateRetriesCommand(long jobKey);
UpdateRetriesJobCommandStep1 newUpdateRetriesCommand(ActivatedJob job);
// 创建Worker命令(Worker是指我们的业务逻辑Handler)
JobWorkerBuilderStep1 newWorker();
// 创建一个活动的Job命令
ActivateJobsCommandStep1 newActivateJobsCommand();
} // end ZeebeClient
public interface JobClient {
// 完成命令
CompleteJobCommandStep1 newCompleteCommand(long jobKey);
CompleteJobCommandStep1 newCompleteCommand(ActivatedJob job);
// 失败命令
FailJobCommandStep1 newFailCommand(long jobKey);
FailJobCommandStep1 newFailCommand(ActivatedJob job);
// 异常命令
ThrowErrorCommandStep1 newThrowErrorCommand(long jobKey);
ThrowErrorCommandStep1 newThrowErrorCommand(ActivatedJob job);
} // end JobClient
(12). 总结
总体来说,ZeebeClient设计还是挺不错的,用到了:构建者模式,命令模式,责任链模式,ZeebClient有不少值得学习的地方,从始至终能发现,它基本都是面向接口编程.