Browse Source

[Bug-7686][Server]fix restart server after kill force (#7688)

* [DS-7686][Server]fix restart server after kill force

* update registry logic

Co-authored-by: caishunfeng <534328519@qq.com>
wind 3 years ago
parent
commit
d3bd7309fb

+ 13 - 8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@@ -118,14 +118,6 @@ public class MasterRegistryClient {
             registryClient.getLock(nodeLock);
             // master registry
             registry();
-            String registryPath = getMasterPath();
-            registryClient.handleDeadServer(Collections.singleton(registryPath), NodeType.MASTER, Constants.DELETE_OP);
-
-            // init system node
-
-            while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
-                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
-            }
 
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
         } catch (Exception e) {
@@ -500,7 +492,20 @@ public class MasterRegistryClient {
                 Constants.MASTER_TYPE,
                 registryClient);
 
+        // remove before persist
+        registryClient.remove(localNodePath);
         registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
+
+        while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
+            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+        }
+
+        // sleep 1s, waiting master failover remove
+        ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+
+        // delete dead server
+        registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
+
         registryClient.addConnectionStateListener(this::handleConnectionState);
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);

+ 19 - 5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.server.worker.registry;
 import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
 import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@@ -99,11 +101,6 @@ public class WorkerRegistryClient {
         Set<String> workerZkPaths = getWorkerZkPaths();
         int workerHeartbeatInterval = workerConfig.getHeartbeatInterval();
 
-        for (String workerZKPath : workerZkPaths) {
-            registryClient.persistEphemeral(workerZKPath, "");
-            logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
-        }
-
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
                 workerConfig.getMaxCpuLoadAvg(),
                 workerConfig.getReservedMemory(),
@@ -115,6 +112,23 @@ public class WorkerRegistryClient {
                 workerManagerThread.getThreadPoolQueueSize()
         );
 
+        for (String workerZKPath : workerZkPaths) {
+            // remove before persist
+            registryClient.remove(workerZKPath);
+            registryClient.persistEphemeral(workerZKPath, heartBeatTask.getHeartBeatInfo());
+            logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
+        }
+
+        while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
+            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+        }
+
+        // sleep 1s, waiting master failover remove
+        ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+
+        // delete dead server
+        registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
+
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
     }