Browse Source

[Bug-9719][Master] fix failover fail because task plugins has not been loaded (#9720)

caishunfeng 3 years ago
parent
commit
5657cb9aec

+ 7 - 0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import javax.annotation.PostConstruct;
 
@@ -68,6 +69,9 @@ public class MasterServer implements IStoppable {
     @Autowired
     private MasterRegistryClient masterRegistryClient;
 
+    @Autowired
+    private TaskPluginManager taskPluginManager;
+
     @Autowired
     private MasterSchedulerService masterSchedulerService;
 
@@ -131,6 +135,9 @@ public class MasterServer implements IStoppable {
 
         this.nettyRemotingServer.start();
 
+        // install task plugin
+        this.taskPluginManager.installPlugin();
+
         // self tolerant
         this.masterRegistryClient.init();
         this.masterRegistryClient.start();

+ 3 - 0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@@ -50,6 +50,9 @@ public class FailoverExecuteThread extends Thread {
 
     @Override
     public void run() {
+        // when startup, wait 10s for ready
+        ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10);
+
         logger.info("failover execute thread started");
         while (Stopper.isRunning()) {
             try {

+ 0 - 4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutor
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -102,9 +101,6 @@ public class MasterSchedulerService extends Thread {
     @Autowired
     private StateWheelExecuteThread stateWheelExecuteThread;
 
-    @Autowired
-    private TaskPluginManager taskPluginManager;
-
     /**
      * constructor of MasterSchedulerService
      */

+ 1 - 4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java

@@ -40,8 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.boot.context.event.ApplicationReadyEvent;
-import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -86,8 +84,7 @@ public class TaskPluginManager {
         return taskChannel.parseParameters(parametersNode);
     }
 
-    @EventListener
-    public void installPlugin(ApplicationReadyEvent readyEvent) {
+    public void installPlugin() {
         final Set<String> names = new HashSet<>();
 
         ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {

+ 3 - 0
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@@ -155,6 +155,9 @@ public class WorkerServer implements IStoppable {
 
         this.nettyRemotingServer.start();
 
+        // install task plugin
+        this.taskPluginManager.installPlugin();
+
         // worker registry
         try {
             this.workerRegistryClient.registry();