Technoboy- 5 лет назад
Родитель
Сommit
76fd384a2e

+ 2 - 14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@@ -17,7 +17,6 @@
 package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -26,19 +25,16 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,12 +80,6 @@ public class MasterServer {
     @Autowired
     private MasterConfig masterConfig;
 
-    /**
-     *  zookeeper registry center
-     */
-    @Autowired
-    private ZookeeperRegistryCenter zookeeperRegistryCenter;
-
     /**
      *  spring application context
      *  only use it for initialization
@@ -105,6 +95,7 @@ public class MasterServer {
     /**
      * master registry
      */
+    @Autowired
     private MasterRegistry masterRegistry;
 
     /**
@@ -126,7 +117,7 @@ public class MasterServer {
 
         //init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(45678);
+        serverConfig.setListenPort(masterConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
@@ -134,7 +125,6 @@ public class MasterServer {
         this.nettyRemotingServer.start();
 
         //
-        this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval());
         this.masterRegistry.registry();
 
         //
@@ -166,8 +156,6 @@ public class MasterServer {
             logger.error("start Quartz failed", e);
         }
 
-        TaskUpdateQueueConsumer taskUpdateQueueConsumer = SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
-        taskUpdateQueueConsumer.start();
         /**
          *  register hooks, which are called before the process exits
          */

+ 11 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@@ -46,6 +46,17 @@ public class MasterConfig {
     @Value("${master.host.selector:lowerWeight}")
     private String hostSelector;
 
+    @Value("${master.listen.port:45678}")
+    private int listenPort;
+
+    public int getListenPort() {
+        return listenPort;
+    }
+
+    public void setListenPort(int listenPort) {
+        this.listenPort = listenPort;
+    }
+
     public String getHostSelector() {
         return hostSelector;
     }

+ 8 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java

@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+
 /**
  * TaskUpdateQueue consumer
  */
@@ -66,6 +68,12 @@ public class TaskUpdateQueueConsumer extends Thread{
     @Autowired
     private ExecutorDispatcher dispatcher;
 
+    @PostConstruct
+    public void init(){
+        super.setName("TaskUpdateQueueConsumerThread");
+        super.start();
+    }
+
     @Override
     public void run() {
         while (Stopper.isRunning()){

+ 19 - 23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@@ -23,10 +23,14 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import java.util.Date;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,6 +41,7 @@ import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
 /**
  *  master registry
  */
+@Service
 public class MasterRegistry {
 
     private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class);
@@ -44,38 +49,28 @@ public class MasterRegistry {
     /**
      *  zookeeper registry center
      */
-    private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     /**
-     *  port
+     * master config
      */
-    private final int port;
-
-    /**
-     * heartbeat interval
-     */
-    private final long heartBeatInterval;
+    @Autowired
+    private MasterConfig masterConfig;
 
     /**
      * heartbeat executor
      */
-    private final ScheduledExecutorService heartBeatExecutor;
+    private ScheduledExecutorService heartBeatExecutor;
 
     /**
      * worker start time
      */
-    private final String startTime;
+    private String startTime;
 
-    /**
-     * construct
-     * @param zookeeperRegistryCenter zookeeperRegistryCenter
-     * @param port port
-     * @param heartBeatInterval heartBeatInterval
-     */
-    public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
-        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
-        this.port = port;
-        this.heartBeatInterval = heartBeatInterval;
+
+    @PostConstruct
+    public void init(){
         this.startTime = DateUtils.dateToString(new Date());
         this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
     }
@@ -100,8 +95,9 @@ public class MasterRegistry {
                 }
             }
         });
-        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
-        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval);
+        int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
+        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
     }
 
     /**
@@ -129,7 +125,7 @@ public class MasterRegistry {
      * @return
      */
     private String getLocalAddress(){
-        return Constants.LOCAL_ADDRESS + ":" + port;
+        return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort();
     }
 
     /**

+ 10 - 17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@@ -19,11 +19,9 @@ package org.apache.dolphinscheduler.server.worker;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
@@ -37,8 +35,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 
 import javax.annotation.PostConstruct;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 
 /**
  *  worker server
@@ -51,18 +47,6 @@ public class WorkerServer {
      */
     private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
 
-    /**
-     *  worker config
-     */
-    @Autowired
-    private WorkerConfig workerConfig;
-
-    /**
-     *  zookeeper registry center
-     */
-    @Autowired
-    private ZookeeperRegistryCenter zookeeperRegistryCenter;
-
     /**
      *  netty remote server
      */
@@ -71,8 +55,15 @@ public class WorkerServer {
     /**
      *  worker registry
      */
+    @Autowired
     private WorkerRegistry workerRegistry;
 
+    /**
+     *  worker config
+     */
+    @Autowired
+    private WorkerConfig workerConfig;
+
     /**
      *  spring application context
      *  only use it for initialization
@@ -87,6 +78,7 @@ public class WorkerServer {
      * @param args arguments
      */
     public static void main(String[] args) {
+        System.setProperty("spring.profiles.active","worker");
         Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
         new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
     }
@@ -101,12 +93,13 @@ public class WorkerServer {
 
         //init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(workerConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
         this.nettyRemotingServer.start();
 
-        this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
+        //
         this.workerRegistry.registry();
 
         /**

+ 11 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@@ -40,6 +40,17 @@ public class WorkerConfig {
     @Value("${worker.group: default}")
     private String workerGroup;
 
+    @Value("${worker.listen.port: 12345}")
+    private int listenPort;
+
+    public int getListenPort() {
+        return listenPort;
+    }
+
+    public void setListenPort(int listenPort) {
+        this.listenPort = listenPort;
+    }
+
     public String getWorkerGroup() {
         return workerGroup;
     }

+ 20 - 37
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@@ -25,9 +25,13 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import java.util.Date;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +45,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLASH;
 /**
  *  worker registry
  */
+@Service
 public class WorkerRegistry {
 
     private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
@@ -48,54 +53,31 @@ public class WorkerRegistry {
     /**
      *  zookeeper registry center
      */
-    private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     /**
-     *  port
+     *  worker config
      */
-    private final int port;
-
-    /**
-     * heartbeat interval
-     */
-    private final long heartBeatInterval;
+    @Autowired
+    private WorkerConfig workerConfig;
 
     /**
      * heartbeat executor
      */
-    private final ScheduledExecutorService heartBeatExecutor;
+    private ScheduledExecutorService heartBeatExecutor;
 
     /**
      * worker start time
      */
-    private final String startTime;
+    private String startTime;
 
-    /**
-     * worker group
-     */
-    private String workerGroup;
 
-    /**
-     * construct
-     *
-     * @param zookeeperRegistryCenter zookeeperRegistryCenter
-     * @param port port
-     * @param heartBeatInterval heartBeatInterval
-     */
-    public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
-        this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP);
-    }
+    private String workerGroup;
 
-    /**
-     *  construct
-     * @param zookeeperRegistryCenter zookeeperRegistryCenter
-     * @param port port
-     */
-    public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval, String workerGroup){
-        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
-        this.port = port;
-        this.heartBeatInterval = heartBeatInterval;
-        this.workerGroup = workerGroup;
+    @PostConstruct
+    public void init(){
+        this.workerGroup = workerConfig.getWorkerGroup();
         this.startTime = DateUtils.dateToString(new Date());
         this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
     }
@@ -120,8 +102,9 @@ public class WorkerRegistry {
                 }
             }
         });
-        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
-        logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval);
+        int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
+        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval);
 
     }
 
@@ -159,7 +142,7 @@ public class WorkerRegistry {
      * @return
      */
     private String getLocalAddress(){
-        return Constants.LOCAL_ADDRESS + ":" + port;
+        return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort();
     }
 
     /**