|
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.registry.api.Registry;
|
|
|
import org.apache.dolphinscheduler.registry.api.RegistryException;
|
|
|
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
|
|
|
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.time.Duration;
|
|
@@ -36,6 +37,7 @@ import java.util.concurrent.ExecutionException;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
+import javax.net.ssl.SSLException;
|
|
|
|
|
|
import lombok.NonNull;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -60,6 +62,8 @@ import io.etcd.jetcd.options.PutOption;
|
|
|
import io.etcd.jetcd.options.WatchOption;
|
|
|
import io.etcd.jetcd.support.Observers;
|
|
|
import io.etcd.jetcd.watch.WatchEvent;
|
|
|
+import io.grpc.netty.GrpcSslContexts;
|
|
|
+import io.netty.handler.ssl.SslContext;
|
|
|
|
|
|
/**
|
|
|
* This is one of the implementation of {@link Registry}, with this implementation, you need to rely on Etcd cluster to
|
|
@@ -80,7 +84,7 @@ public class EtcdRegistry implements Registry {
|
|
|
private final Map<String, Watch.Watcher> watcherMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
private static final long TIME_TO_LIVE_SECONDS = 30L;
|
|
|
- public EtcdRegistry(EtcdRegistryProperties registryProperties) {
|
|
|
+ public EtcdRegistry(EtcdRegistryProperties registryProperties) throws SSLException {
|
|
|
ClientBuilder clientBuilder = Client.builder()
|
|
|
.endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(registryProperties.getEndpoints())))
|
|
|
.namespace(byteSequence(registryProperties.getNamespace()))
|
|
@@ -100,6 +104,19 @@ public class EtcdRegistry implements Registry {
|
|
|
if (StringUtils.hasLength(registryProperties.getAuthority())) {
|
|
|
clientBuilder.authority(registryProperties.getAuthority());
|
|
|
}
|
|
|
+ if (StringUtils.hasLength(registryProperties.getCertFile())
|
|
|
+ && StringUtils.hasLength(registryProperties.getKeyCertChainFile())
|
|
|
+ && StringUtils.hasLength(registryProperties.getKeyFile())) {
|
|
|
+ String userDir = System.getProperty("user.dir") + "/";
|
|
|
+ File certFile = new File(userDir + registryProperties.getCertFile());
|
|
|
+ File keyCertChainFile = new File(userDir + registryProperties.getKeyCertChainFile());
|
|
|
+ File keyFile = new File(userDir + registryProperties.getKeyFile());
|
|
|
+ SslContext context = GrpcSslContexts.forClient()
|
|
|
+ .trustManager(certFile)
|
|
|
+ .keyManager(keyCertChainFile, keyFile)
|
|
|
+ .build();
|
|
|
+ clientBuilder.sslContext(context);
|
|
|
+ }
|
|
|
client = clientBuilder.build();
|
|
|
log.info("Started Etcd Registry...");
|
|
|
etcdConnectionStateListener = new EtcdConnectionStateListener(client);
|