Explorar o código

Optimize some code (#8324)

Wenjun Ruan %!s(int64=3) %!d(string=hai) anos
pai
achega
66dcf2376a

+ 1 - 0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@@ -122,6 +122,7 @@ public class MasterRegistryClient {
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error("master start up exception", e);
             logger.error("master start up exception", e);
+            throw new RuntimeException("master start up error", e);
         } finally {
         } finally {
             registryClient.releaseLock(nodeLock);
             registryClient.releaseLock(nodeLock);
         }
         }

+ 2 - 2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java

@@ -56,7 +56,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
         }
         }
     }
     }
 
 
-    public void handleMasterEvent(Event event) {
+    private void handleMasterEvent(Event event) {
         final String path = event.path();
         final String path = event.path();
         switch (event.type()) {
         switch (event.type()) {
             case ADD:
             case ADD:
@@ -70,7 +70,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
         }
         }
     }
     }
 
 
-    public void handleWorkerEvent(Event event) {
+    private void handleWorkerEvent(Event event) {
         final String path = event.path();
         final String path = event.path();
         switch (event.type()) {
         switch (event.type()) {
             case ADD:
             case ADD:

+ 16 - 15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -117,7 +116,7 @@ public class ServerNodeManager implements InitializingBean {
     @Autowired
     @Autowired
     private WorkerGroupMapper workerGroupMapper;
     private WorkerGroupMapper workerGroupMapper;
 
 
-    private MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
+    private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
 
 
     /**
     /**
      * alert dao
      * alert dao
@@ -125,15 +124,16 @@ public class ServerNodeManager implements InitializingBean {
     @Autowired
     @Autowired
     private AlertDao alertDao;
     private AlertDao alertDao;
 
 
-    public static volatile List<Integer> SLOT_LIST = new ArrayList<>();
+    private static volatile int MASTER_SLOT = 0;
 
 
-    public static volatile Integer MASTER_SIZE = 0;
+    private static volatile int MASTER_SIZE = 0;
 
 
-    public static Integer getSlot() {
-        if (SLOT_LIST.size() > 0) {
-            return SLOT_LIST.get(0);
-        }
-        return 0;
+    public static int getSlot() {
+        return MASTER_SLOT;
+    }
+
+    public static int getMasterSize() {
+        return MASTER_SIZE;
     }
     }
 
 
 
 
@@ -295,7 +295,7 @@ public class ServerNodeManager implements InitializingBean {
     }
     }
 
 
     private void updateMasterNodes() {
     private void updateMasterNodes() {
-        SLOT_LIST.clear();
+        MASTER_SLOT = 0;
         this.masterNodes.clear();
         this.masterNodes.clear();
         String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
         String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
         try {
         try {
@@ -333,17 +333,18 @@ public class ServerNodeManager implements InitializingBean {
     private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
     private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
         masterLock.lock();
         masterLock.lock();
         try {
         try {
+            String host = NetUtils.getHost();
             this.masterNodes.addAll(nodes);
             this.masterNodes.addAll(nodes);
             this.masterPriorityQueue.clear();
             this.masterPriorityQueue.clear();
             this.masterPriorityQueue.putList(masterNodes);
             this.masterPriorityQueue.putList(masterNodes);
-            int index = masterPriorityQueue.getIndex(NetUtils.getHost());
+            int index = masterPriorityQueue.getIndex(host);
             if (index >= 0) {
             if (index >= 0) {
                 MASTER_SIZE = nodes.size();
                 MASTER_SIZE = nodes.size();
-                SLOT_LIST.add(masterPriorityQueue.getIndex(NetUtils.getHost()));
+                MASTER_SLOT = index;
+            } else {
+                logger.warn("current host:{} is not in active master list", host);
             }
             }
-            logger.info("update master nodes, master size: {}, slot: {}",
-                    MASTER_SIZE, SLOT_LIST.toString()
-            );
+            logger.info("update master nodes, master size: {}, slot: {}", MASTER_SIZE, MASTER_SLOT);
         } finally {
         } finally {
             masterLock.unlock();
             masterLock.unlock();
         }
         }

+ 15 - 23
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@@ -30,14 +30,12 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
-import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.CollectionUtils;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -185,29 +183,24 @@ public class MasterSchedulerService extends Thread {
             return null;
             return null;
         }
         }
 
 
-        ProcessInstance[] processInstances = new ProcessInstance[commands.size()];
+        List<ProcessInstance> processInstances = new ArrayList<>(commands.size());
         CountDownLatch latch = new CountDownLatch(commands.size());
         CountDownLatch latch = new CountDownLatch(commands.size());
-        for (int i = 0; i < commands.size(); i++) {
-            int index = i;
-            this.masterPrepareExecService.execute(() -> {
-                Command command = commands.get(index);
-                // slot check again
-                if (!slotCheck(command)) {
-                    latch.countDown();
-                    return;
-                }
-
+        for (final Command command : commands) {
+            masterPrepareExecService.execute(() -> {
                 try {
                 try {
+                    // slot check again
+                    if (!slotCheck(command)) {
+                        return;
+                    }
                     ProcessInstance processInstance = processService.handleCommand(logger,
                     ProcessInstance processInstance = processService.handleCommand(logger,
                             getLocalAddress(),
                             getLocalAddress(),
                             command);
                             command);
                     if (processInstance != null) {
                     if (processInstance != null) {
-                        processInstances[index] = processInstance;
-                        logger.info("handle command command {} end, create process instance {}",
-                                command.getId(), processInstance.getId());
+                        processInstances.add(processInstance);
+                        logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
                     }
                     }
                 } catch (Exception e) {
                 } catch (Exception e) {
-                    logger.error("scan command error ", e);
+                    logger.error("handle command error ", e);
                     processService.moveToErrorCommand(command, e.toString());
                     processService.moveToErrorCommand(command, e.toString());
                 } finally {
                 } finally {
                     latch.countDown();
                     latch.countDown();
@@ -222,7 +215,7 @@ public class MasterSchedulerService extends Thread {
             logger.error("countDownLatch await error ", e);
             logger.error("countDownLatch await error ", e);
         }
         }
 
 
-        return Arrays.asList(processInstances);
+        return processInstances;
     }
     }
 
 
     private List<Command> findCommands() {
     private List<Command> findCommands() {
@@ -230,9 +223,10 @@ public class MasterSchedulerService extends Thread {
         int pageSize = masterConfig.getFetchCommandNum();
         int pageSize = masterConfig.getFetchCommandNum();
         List<Command> result = new ArrayList<>();
         List<Command> result = new ArrayList<>();
         while (Stopper.isRunning()) {
         while (Stopper.isRunning()) {
-            if (ServerNodeManager.MASTER_SIZE == 0) {
+            if (ServerNodeManager.getMasterSize() == 0) {
                 return result;
                 return result;
             }
             }
+            // todo: Can we use the slot to scan database?
             List<Command> commandList = processService.findCommandPage(pageSize, pageNumber);
             List<Command> commandList = processService.findCommandPage(pageSize, pageNumber);
             if (commandList.size() == 0) {
             if (commandList.size() == 0) {
                 return result;
                 return result;
@@ -253,10 +247,8 @@ public class MasterSchedulerService extends Thread {
 
 
     private boolean slotCheck(Command command) {
     private boolean slotCheck(Command command) {
         int slot = ServerNodeManager.getSlot();
         int slot = ServerNodeManager.getSlot();
-        if (ServerNodeManager.MASTER_SIZE != 0 && command.getId() % ServerNodeManager.MASTER_SIZE == slot) {
-            return true;
-        }
-        return false;
+        int masterSize = ServerNodeManager.getMasterSize();
+        return masterSize != 0 && command.getId() % masterSize == slot;
     }
     }
 
 
     private String getLocalAddress() {
     private String getLocalAddress() {

+ 9 - 41
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@@ -32,11 +32,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelInitializer;
@@ -59,11 +60,6 @@ public class NettyRemotingServer {
      */
      */
     private final ServerBootstrap serverBootstrap = new ServerBootstrap();
     private final ServerBootstrap serverBootstrap = new ServerBootstrap();
 
 
-    /**
-     * encoder
-     */
-    private final NettyEncoder encoder = new NettyEncoder();
-
     /**
     /**
      * default executor
      * default executor
      */
      */
@@ -106,42 +102,14 @@ public class NettyRemotingServer {
      */
      */
     public NettyRemotingServer(final NettyServerConfig serverConfig) {
     public NettyRemotingServer(final NettyServerConfig serverConfig) {
         this.serverConfig = serverConfig;
         this.serverConfig = serverConfig;
+        ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build();
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build();
         if (NettyUtils.useEpoll()) {
         if (NettyUtils.useEpoll()) {
-            this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
-
-            this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
+            this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
         } else {
         } else {
-            this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
-
-            this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.bossGroup = new NioEventLoopGroup(1, bossThreadFactory);
+            this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
         }
         }
     }
     }
 
 
@@ -191,7 +159,7 @@ public class NettyRemotingServer {
      */
      */
     private void initNettyChannel(SocketChannel ch) {
     private void initNettyChannel(SocketChannel ch) {
         ch.pipeline()
         ch.pipeline()
-                .addLast("encoder", encoder)
+                .addLast("encoder", new NettyEncoder())
                 .addLast("decoder", new NettyDecoder())
                 .addLast("decoder", new NettyDecoder())
                 .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
                 .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
                 .addLast("handler", serverHandler);
                 .addLast("handler", serverHandler);

+ 1 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -275,7 +275,7 @@ public class ProcessService {
         }
         }
         processInstance.setCommandType(command.getCommandType());
         processInstance.setCommandType(command.getCommandType());
         processInstance.addHistoryCmd(command.getCommandType());
         processInstance.addHistoryCmd(command.getCommandType());
-        //if the processDefination is serial
+        //if the processDefinition is serial
         ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
         ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
         if (processDefinition.getExecutionType().typeIsSerial()) {
         if (processDefinition.getExecutionType().typeIsSerial()) {
             saveSerialProcess(processInstance, processDefinition);
             saveSerialProcess(processInstance, processDefinition);

+ 2 - 2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java

@@ -97,12 +97,12 @@ public class MasterPriorityQueue implements TaskPriorityQueue<Server> {
     }
     }
 
 
     /**
     /**
-     * server comparator
+     * server comparator, used to sort server by createTime in reverse order.
      */
      */
     private class ServerComparator implements Comparator<Server> {
     private class ServerComparator implements Comparator<Server> {
         @Override
         @Override
         public int compare(Server o1, Server o2) {
         public int compare(Server o1, Server o2) {
-            return o1.getCreateTime().before(o2.getCreateTime()) ? 1 : 0;
+            return o2.getCreateTime().compareTo(o1.getCreateTime());
         }
         }
     }
     }