Browse Source

Refactor worker (#2331)

* let quartz use the same datasource

* move master/worker config from dao.properties to each config
add master/worker registry test

* move mybatis config from application.properties to SpringConnectionFactory

* move mybatis-plus config from application.properties to SpringConnectionFactory

* refactor TaskCallbackService

* add ZookeeperNodeManagerTest

* add NettyExecutorManagerTest

* refactor TaskKillProcessor

* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest

* add RoundRobinHostManagerTest, ExecutorDispatcherTest

* refactor task response service

* add TaskResponseServiceTest

* modify close method for MasterSchedulerService
Tboy 5 years ago
parent
commit
3bafa97f96

+ 6 - 3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@@ -81,6 +81,9 @@ public class MasterServer {
     @Autowired
     private ZKMasterClient zkMasterClient;
 
+    /**
+     * scheduler service
+     */
     @Autowired
     private MasterSchedulerService masterSchedulerService;
 
@@ -160,16 +163,16 @@ public class MasterServer {
             Stopper.stop();
 
             try {
-                //thread sleep 3 seconds for thread quitely stop
+                //thread sleep 3 seconds for thread quietly stop
                 Thread.sleep(3000L);
             }catch (Exception e){
                 logger.warn("thread sleep exception ", e);
             }
+            //
+            this.masterSchedulerService.close();
             this.nettyRemotingServer.close();
             this.masterRegistry.unRegistry();
             this.zkMasterClient.close();
-            this.masterSchedulerService.close();
-
             //close quartz
             try{
                 QuartzExecutors.getInstance().shutdown();

+ 10 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@@ -36,6 +36,7 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  *  master scheduler thread
@@ -93,7 +94,15 @@ public class MasterSchedulerService extends Thread {
         super.start();
     }
 
-    public void close(){
+    public void close() {
+        masterExecService.shutdown();
+        boolean terminated = false;
+        try {
+            terminated = masterExecService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException ignore) {}
+        if(!terminated){
+            logger.warn("masterExecService shutdown without terminated, increase await time");
+        }
         nettyRemotingClient.close();
         logger.info("master schedule service stopped...");
     }