Explorar o código

[Improvement-12907] Change heartbeat log level to debug (#12980)

Yann Ann %!s(int64=2) %!d(string=hai) anos
pai
achega
3106054ea7

+ 35 - 35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@@ -99,34 +99,44 @@ public class LowerWeightHostManager extends CommonHostManager {
         public void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo) {
             syncWorkerResources(workerGroups, workerNodeInfo);
         }
-    }
 
-    /**
-     * Sync worker resource.
-     *
-     * @param workerGroupNodes  worker group nodes, key is worker group, value is worker group nodes.
-     * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
-     */
-    private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
-                                     final Map<String, WorkerHeartBeat> workerNodeInfoMap) {
-        try {
-            Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
-            for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
-                String workerGroup = entry.getKey();
-                Set<String> nodes = entry.getValue();
-                Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
-                for (String node : nodes) {
-                    WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null);
-                    Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
-                    hostWeightOpt.ifPresent(hostWeights::add);
-                }
-                if (!hostWeights.isEmpty()) {
-                    workerHostWeights.put(workerGroup, hostWeights);
+        /**
+         * Sync worker resource.
+         *
+         * @param workerGroupNodes  worker group nodes, key is worker group, value is worker group nodes.
+         * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
+         */
+        private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
+                                         final Map<String, WorkerHeartBeat> workerNodeInfoMap) {
+            try {
+                Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
+                for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
+                    String workerGroup = entry.getKey();
+                    Set<String> nodes = entry.getValue();
+                    Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
+                    for (String node : nodes) {
+                        WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null);
+                        Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
+                        hostWeightOpt.ifPresent(hostWeights::add);
+                    }
+                    if (!hostWeights.isEmpty()) {
+                        workerHostWeights.put(workerGroup, hostWeights);
+                    }
                 }
+                syncWorkerHostWeight(workerHostWeights);
+            } catch (Throwable ex) {
+                logger.error("Sync worker resource error", ex);
+            }
+        }
+
+        private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
+            lock.lock();
+            try {
+                workerHostWeightsMap.clear();
+                workerHostWeightsMap.putAll(workerHostWeights);
+            } finally {
+                lock.unlock();
             }
-            syncWorkerHostWeight(workerHostWeights);
-        } catch (Throwable ex) {
-            logger.error("Sync worker resource error", ex);
         }
     }
 
@@ -155,16 +165,6 @@ public class LowerWeightHostManager extends CommonHostManager {
                         heartBeat.getStartupTime()));
     }
 
-    private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
-        lock.lock();
-        try {
-            workerHostWeightsMap.clear();
-            workerHostWeightsMap.putAll(workerHostWeights);
-        } finally {
-            lock.unlock();
-        }
-    }
-
     private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
         lock.lock();
         try {

+ 22 - 31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@@ -129,8 +129,7 @@ public class ServerNodeManager implements InitializingBean {
 
         // load nodes from zookeeper
         updateMasterNodes();
-        updateWorkerNodes();
-        updateWorkerGroupMappings();
+        refreshWorkerNodesAndGroupMappings();
 
         // init executor service
         executorService =
@@ -153,27 +152,21 @@ public class ServerNodeManager implements InitializingBean {
         @Override
         public void run() {
             try {
-
                 // sync worker node info
-                updateWorkerNodes();
-                updateWorkerGroupMappings();
-                notifyWorkerInfoChangeListeners();
+                refreshWorkerNodesAndGroupMappings();
             } catch (Exception e) {
                 logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
             }
         }
     }
 
-    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
-                                                        WorkerGroup wg) {
-        Set<String> nodes = new HashSet<>();
-        String[] addrs = wg.getAddrList().split(Constants.COMMA);
-        for (String addr : addrs) {
-            if (newWorkerNodeInfo.containsKey(addr)) {
-                nodes.add(addr);
-            }
-        }
-        return nodes;
+    /**
+     * Refresh worker nodes and worker group mapping information
+     */
+    private void refreshWorkerNodesAndGroupMappings() {
+        updateWorkerNodes();
+        updateWorkerGroupMappings();
+        notifyWorkerInfoChangeListeners();
     }
 
     /**
@@ -204,7 +197,15 @@ public class ServerNodeManager implements InitializingBean {
                 } catch (Exception ex) {
                     logger.error("WorkerGroupListener capture data change and get data failed", ex);
                 }
+            }
+        }
 
+        private void syncSingleWorkerNodeInfo(String workerAddress, WorkerHeartBeat info) {
+            workerNodeInfoWriteLock.lock();
+            try {
+                workerNodeInfo.put(workerAddress, info);
+            } finally {
+                workerNodeInfoWriteLock.unlock();
             }
         }
     }
@@ -241,8 +242,8 @@ public class ServerNodeManager implements InitializingBean {
         try {
             registryClient.getLock(nodeLock);
             Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
-            List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
-            syncMasterNodes(currentNodes, masterNodes);
+            List<Server> masterNodeList = registryClient.getServerList(NodeType.MASTER);
+            syncMasterNodes(currentNodes, masterNodeList);
         } catch (Exception e) {
             logger.error("update master nodes error", e);
         } finally {
@@ -289,7 +290,6 @@ public class ServerNodeManager implements InitializingBean {
         try {
             workerGroupNodes.clear();
             workerGroupNodes.putAll(tmpWorkerGroupMappings);
-            notifyWorkerInfoChangeListeners();
         } finally {
             workerGroupWriteLock.unlock();
         }
@@ -363,15 +363,6 @@ public class ServerNodeManager implements InitializingBean {
         }
     }
 
-    private void syncSingleWorkerNodeInfo(String workerAddress, WorkerHeartBeat info) {
-        workerNodeInfoWriteLock.lock();
-        try {
-            workerNodeInfo.put(workerAddress, info);
-        } finally {
-            workerNodeInfoWriteLock.unlock();
-        }
-    }
-
     /**
      * Add the resource change listener, when the resource changed, the listener will be notified.
      *
@@ -382,10 +373,10 @@ public class ServerNodeManager implements InitializingBean {
     }
 
     private void notifyWorkerInfoChangeListeners() {
-        Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
-        Map<String, WorkerHeartBeat> workerNodeInfo = getWorkerNodeInfo();
+        Map<String, Set<String>> workerGroupNodeMap = getWorkerGroupNodes();
+        Map<String, WorkerHeartBeat> workerNodeInfoMap = getWorkerNodeInfo();
         for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
-            listener.notify(workerGroupNodes, workerNodeInfo);
+            listener.notify(workerGroupNodeMap, workerNodeInfoMap);
         }
     }
 

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java

@@ -68,7 +68,7 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
     public void writeHeartBeat(MasterHeartBeat masterHeartBeat) {
         String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
         registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
-        log.info("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
+        log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
                 heartBeatPath, masterHeartBeatJson);
     }
 }

+ 2 - 3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

@@ -59,9 +59,8 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
         double reservedMemory = workerConfig.getReservedMemory();
         double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
         int execThreads = workerConfig.getExecThreads();
-        int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
         int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory,
-                execThreads, workerWaitingTaskCount);
+                execThreads, this.workerWaitingTaskCount.get());
 
         return WorkerHeartBeat.builder()
                 .startupTime(ServerLifeCycleManager.getServerStartupTime())
@@ -86,7 +85,7 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
         String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
         String workerRegistryPath = workerConfig.getWorkerRegistryPath();
         registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson);
-        log.info(
+        log.debug(
                 "Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}",
                 workerRegistryPath, workerHeartBeatJson);
     }