Sfoglia il codice sorgente

[Feature-3134]Refactor to use a shared singleton Curator Zookeeper Client (#3244)

* [Feature-3134]Refactor to use a shared singleton Curator Zookeeper Client

* autowire CuratorZookeeperClient to ZookeeperOperator

* [Improvement] Add the issue specifications reference (#3221)

* log zookeeper address when the connect state change

* resume the operation of add connect state listener in MasterRegistry

Co-authored-by: Yichao Yang <1048262223@qq.com>
tswstarplanet 4 anni fa
parent
commit
5b7efd2d33

+ 2 - 3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java

@@ -81,8 +81,7 @@ public class ZookeeperMonitor extends AbstractZKClient {
 				if(ok){
 					state.getZookeeperInfo();
 				}
-				
-				String hostName = zookeeperServer;
+
 				int connections = state.getConnections();
 				int watches = state.getWatches();
 				long sent = state.getSent();
@@ -95,7 +94,7 @@ public class ZookeeperMonitor extends AbstractZKClient {
 				int status = ok ? 1 : 0;
 				Date date = new Date();
 
-				ZookeeperRecord zookeeperRecord = new ZookeeperRecord(hostName,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date);
+				ZookeeperRecord zookeeperRecord = new ZookeeperRecord(zookeeperServer,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date);
 				list.add(zookeeperRecord);
 
 			}

+ 2 - 1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
 import org.junit.Test;
@@ -43,7 +44,7 @@ import java.util.Set;
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class,
         NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
-        ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
+        ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class})
 public class MasterTaskExecThreadTest {
 
 

+ 1 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@@ -326,7 +326,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
 	@Override
 	public String toString() {
 		return "AbstractZKClient{" +
-				"zkClient=" + zkClient +
+				"zkClient=" + getZkClient() +
 				", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
 				", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
 				", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' +

+ 119 - 0
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java

@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.zk;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+/**
+ * Shared Curator zookeeper client
+ */
+@Component
+public class CuratorZookeeperClient implements InitializingBean {
+    private final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class);
+
+    @Autowired
+    private ZookeeperConfig zookeeperConfig;
+
+    private CuratorFramework zkClient;
+
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        this.zkClient = buildClient();
+        initStateLister();
+    }
+
+    private CuratorFramework buildClient() {
+        logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
+
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
+                .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
+
+        //these has default value
+        if (0 != zookeeperConfig.getSessionTimeoutMs()) {
+            builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs());
+        }
+        if (0 != zookeeperConfig.getConnectionTimeoutMs()) {
+            builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs());
+        }
+        if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) {
+            builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() {
+
+                @Override
+                public List<ACL> getDefaultAcl() {
+                    return ZooDefs.Ids.CREATOR_ALL_ACL;
+                }
+
+                @Override
+                public List<ACL> getAclForPath(final String path) {
+                    return ZooDefs.Ids.CREATOR_ALL_ACL;
+                }
+            });
+        }
+        zkClient = builder.build();
+        zkClient.start();
+        try {
+            zkClient.blockUntilConnected();
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+        return zkClient;
+    }
+
+    public void initStateLister() {
+        checkNotNull(zkClient);
+
+        zkClient.getConnectionStateListenable().addListener((client, newState) -> {
+            if(newState == ConnectionState.LOST){
+                logger.error("connection lost from zookeeper");
+            } else if(newState == ConnectionState.RECONNECTED){
+                logger.info("reconnected to zookeeper");
+            } else if(newState == ConnectionState.SUSPENDED){
+                logger.warn("connection SUSPENDED to zookeeper");
+            }
+        });
+    }
+
+    public ZookeeperConfig getZookeeperConfig() {
+        return zookeeperConfig;
+    }
+
+    public void setZookeeperConfig(ZookeeperConfig zookeeperConfig) {
+        this.zookeeperConfig = zookeeperConfig;
+    }
+
+    public CuratorFramework getZkClient() {
+        return zkClient;
+    }
+}

+ 1 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@@ -39,7 +39,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
      */
     @Override
     protected void registerListener() {
-        treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes");
+        treeCache = new TreeCache(getZkClient(), getZookeeperConfig().getDsRoot() + "/nodes");
         logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
         try {
             treeCache.start();

+ 17 - 74
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@@ -50,14 +50,10 @@ public class ZookeeperOperator implements InitializingBean {
     private final Logger logger = LoggerFactory.getLogger(ZookeeperOperator.class);
 
     @Autowired
-    private ZookeeperConfig zookeeperConfig;
-
-    protected CuratorFramework zkClient;
+    private CuratorZookeeperClient zookeeperClient;
 
     @Override
     public void afterPropertiesSet() throws Exception {
-        this.zkClient = buildClient();
-        initStateLister();
         registerListener();
     }
 
@@ -66,62 +62,9 @@ public class ZookeeperOperator implements InitializingBean {
      */
     protected void registerListener(){}
 
-    public void initStateLister() {
-        checkNotNull(zkClient);
-
-        zkClient.getConnectionStateListenable().addListener((client, newState) -> {
-            if(newState == ConnectionState.LOST){
-                logger.error("connection lost from zookeeper");
-            } else if(newState == ConnectionState.RECONNECTED){
-                logger.info("reconnected to zookeeper");
-            } else if(newState == ConnectionState.SUSPENDED){
-                logger.warn("connection SUSPENDED to zookeeper");
-            }
-        });
-    }
-
-    private CuratorFramework buildClient() {
-        logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
-
-        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
-                .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
-
-        //these has default value
-        if (0 != zookeeperConfig.getSessionTimeoutMs()) {
-            builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs());
-        }
-        if (0 != zookeeperConfig.getConnectionTimeoutMs()) {
-            builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs());
-        }
-        if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) {
-            builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() {
-
-                @Override
-                public List<ACL> getDefaultAcl() {
-                    return ZooDefs.Ids.CREATOR_ALL_ACL;
-                }
-
-                @Override
-                public List<ACL> getAclForPath(final String path) {
-                    return ZooDefs.Ids.CREATOR_ALL_ACL;
-                }
-            });
-        }
-        zkClient = builder.build();
-        zkClient.start();
-        try {
-            if (!zkClient.blockUntilConnected(zookeeperConfig.getMaxWaitTime(), TimeUnit.MILLISECONDS)) {
-                throw new IllegalStateException("Connect zookeeper expire max wait time");
-            }
-        } catch (final Exception ex) {
-            throw new RuntimeException(ex);
-        }
-        return zkClient;
-    }
-
     public String get(final String key) {
         try {
-            return new String(zkClient.getData().forPath(key), StandardCharsets.UTF_8);
+            return new String(zookeeperClient.getZkClient().getData().forPath(key), StandardCharsets.UTF_8);
         } catch (Exception ex) {
             logger.error("get key : {}", key, ex);
         }
@@ -131,7 +74,7 @@ public class ZookeeperOperator implements InitializingBean {
     public List<String> getChildrenKeys(final String key) {
         List<String> values;
         try {
-            values = zkClient.getChildren().forPath(key);
+            values = zookeeperClient.getZkClient().getChildren().forPath(key);
             return values;
         } catch (InterruptedException ex) {
             logger.error("getChildrenKeys key : {} InterruptedException", key);
@@ -145,7 +88,7 @@ public class ZookeeperOperator implements InitializingBean {
     public boolean hasChildren(final String key){
         Stat stat ;
         try {
-            stat = zkClient.checkExists().forPath(key);
+            stat = zookeeperClient.getZkClient().checkExists().forPath(key);
             return stat.getNumChildren() >= 1;
         } catch (Exception ex) {
             throw new IllegalStateException(ex);
@@ -154,7 +97,7 @@ public class ZookeeperOperator implements InitializingBean {
 
     public boolean isExisted(final String key) {
         try {
-            return zkClient.checkExists().forPath(key) != null;
+            return zookeeperClient.getZkClient().checkExists().forPath(key) != null;
         } catch (Exception ex) {
             logger.error("isExisted key : {}", key, ex);
         }
@@ -164,7 +107,7 @@ public class ZookeeperOperator implements InitializingBean {
     public void persist(final String key, final String value) {
         try {
             if (!isExisted(key)) {
-                zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+                zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
             } else {
                 update(key, value);
             }
@@ -176,9 +119,9 @@ public class ZookeeperOperator implements InitializingBean {
     public void update(final String key, final String value) {
         try {
 
-            CuratorOp check = zkClient.transactionOp().check().forPath(key);
-            CuratorOp setData = zkClient.transactionOp().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
-            zkClient.transaction().forOperations(check, setData);
+            CuratorOp check = zookeeperClient.getZkClient().transactionOp().check().forPath(key);
+            CuratorOp setData = zookeeperClient.getZkClient().transactionOp().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
+            zookeeperClient.getZkClient().transaction().forOperations(check, setData);
 
         } catch (Exception ex) {
             logger.error("update key : {} , value : {}", key, value, ex);
@@ -189,12 +132,12 @@ public class ZookeeperOperator implements InitializingBean {
         try {
             if (isExisted(key)) {
                 try {
-                    zkClient.delete().deletingChildrenIfNeeded().forPath(key);
+                    zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key);
                 } catch (KeeperException.NoNodeException ignore) {
                     //NOP
                 }
             }
-            zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+            zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
         } catch (final Exception ex) {
             logger.error("persistEphemeral key : {} , value : {}", key, value, ex);
         }
@@ -206,7 +149,7 @@ public class ZookeeperOperator implements InitializingBean {
                 persistEphemeral(key, value);
             } else {
                 if (!isExisted(key)) {
-                    zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+                    zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
                 }
             }
         } catch (final Exception ex) {
@@ -216,7 +159,7 @@ public class ZookeeperOperator implements InitializingBean {
 
     public void persistEphemeralSequential(final String key, String value) {
         try {
-            zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+            zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
         } catch (final Exception ex) {
             logger.error("persistEphemeralSequential key : {}", key, ex);
         }
@@ -225,7 +168,7 @@ public class ZookeeperOperator implements InitializingBean {
     public void remove(final String key) {
         try {
             if (isExisted(key)) {
-                zkClient.delete().deletingChildrenIfNeeded().forPath(key);
+                zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key);
             }
         } catch (KeeperException.NoNodeException ignore) {
             //NOP
@@ -235,14 +178,14 @@ public class ZookeeperOperator implements InitializingBean {
     }
 
     public CuratorFramework getZkClient() {
-        return zkClient;
+        return zookeeperClient.getZkClient();
     }
 
     public ZookeeperConfig getZookeeperConfig() {
-        return zookeeperConfig;
+        return zookeeperClient.getZookeeperConfig();
     }
 
     public void close() {
-        CloseableUtils.closeQuietly(zkClient);
+        CloseableUtils.closeQuietly(zookeeperClient.getZkClient());
     }
 }

+ 67 - 0
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.zk;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class CuratorZookeeperClientTest {
+    private static ZKServer zkServer;
+
+    @Before
+    public void before() throws IOException {
+        new Thread(() -> {
+            if (zkServer == null) {
+                zkServer = new ZKServer();
+            }
+            zkServer.startLocalZkServer(2185);
+        }).start();
+    }
+
+    @After
+    public void after() {
+        if (zkServer != null) {
+            zkServer.stop();
+        }
+    }
+
+    @Test
+    public void testAfterPropertiesSet() throws Exception {
+        TimeUnit.SECONDS.sleep(10);
+        CuratorZookeeperClient zookeeperClient = new CuratorZookeeperClient();
+        ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
+        zookeeperConfig.setServerList("127.0.0.1:2185");
+        zookeeperConfig.setBaseSleepTimeMs(100);
+        zookeeperConfig.setMaxSleepMs(30000);
+        zookeeperConfig.setMaxRetries(10);
+        zookeeperConfig.setSessionTimeoutMs(60000);
+        zookeeperConfig.setConnectionTimeoutMs(30000);
+        zookeeperConfig.setDigest(" ");
+        zookeeperConfig.setDsRoot("/dolphinscheduler");
+        zookeeperConfig.setMaxWaitTime(30000);
+        zookeeperClient.setZookeeperConfig(zookeeperConfig);
+        System.out.println("start");
+        zookeeperClient.afterPropertiesSet();
+        System.out.println("end");
+        Assert.assertNotNull(zookeeperClient.getZkClient());
+    }
+}

+ 1 - 0
pom.xml

@@ -832,6 +832,7 @@
                         <include>**/service/quartz/cron/CronUtilsTest.java</include>
                         <include>**/service/zk/DefaultEnsembleProviderTest.java</include>
                         <include>**/service/zk/ZKServerTest.java</include>
+                        <include>**/service/zk/CuratorZookeeperClientTest.java</include>
                         <include>**/service/queue/TaskUpdateQueueTest.java</include>
 
                         <include>**/dao/mapper/DataSourceUserMapperTest.java</include>