Explorar o código

Refactor worker (#2121)

* refactor worker registry

* refactor master server

* refactor MasterSchedulerService
Tboy %!s(int64=5) %!d(string=hai) anos
pai
achega
fd6f13fff7

+ 7 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
+import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
@@ -80,6 +81,9 @@ public class MasterServer {
     @Autowired
     private ZKMasterClient zkMasterClient;
 
+    @Autowired
+    private MasterSchedulerService masterSchedulerService;
+
     /**
      * master server startup
      *
@@ -109,6 +113,8 @@ public class MasterServer {
         //
         this.zkMasterClient.start();
         this.masterRegistry.registry();
+        //
+        masterSchedulerService.start();
 
         // start QuartzExecutors
         // what system should do if exception
@@ -162,6 +168,7 @@ public class MasterServer {
             this.nettyRemotingServer.close();
             this.masterRegistry.unRegistry();
             this.zkMasterClient.close();
+            this.masterSchedulerService.close();
 
             //close quartz
             try{

+ 12 - 14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@@ -34,8 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import java.util.concurrent.ThreadPoolExecutor;
 
 /**
@@ -49,11 +47,6 @@ public class MasterSchedulerService extends Thread {
      */
     private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
 
-    /**
-     * master exec service
-     */
-    private ThreadPoolExecutor masterExecService;
-
     /**
      * dolphinscheduler database interface
      */
@@ -66,28 +59,33 @@ public class MasterSchedulerService extends Thread {
     @Autowired
     private ZKMasterClient zkMasterClient;
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     /**
      *  netty remoting client
      */
-    private NettyRemotingClient nettyRemotingClient;
-
+    private final NettyRemotingClient nettyRemotingClient;
 
-    @Autowired
-    private MasterConfig masterConfig;
+    /**
+     * master exec service
+     */
+    private final ThreadPoolExecutor masterExecService;
 
     /**
      * constructor of MasterSchedulerThread
      */
-    @PostConstruct
-    public void init(){
+    public MasterSchedulerService(){
         this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+    }
+
+    public void start(){
         super.setName("MasterSchedulerThread");
         super.start();
     }
 
-    @PreDestroy
     public void close(){
         nettyRemotingClient.close();
         logger.info("master schedule service stopped...");