Browse Source

Kill task when we do master failover to avoid task lost (#10997)

Wenjun Ruan 2 years ago
parent
commit
998e4d74dd

+ 20 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@@ -28,8 +28,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
@@ -63,13 +67,17 @@ public class MasterFailoverService {
     private final ProcessService processService;
     private final String localAddress;
 
+    private final NettyExecutorManager nettyExecutorManager;
+
     public MasterFailoverService(@NonNull RegistryClient registryClient,
                                  @NonNull MasterConfig masterConfig,
-                                 @NonNull ProcessService processService) {
+                                 @NonNull ProcessService processService,
+                                 @NonNull NettyExecutorManager nettyExecutorManager) {
         this.registryClient = registryClient;
         this.masterConfig = masterConfig;
         this.processService = processService;
         this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        this.nettyExecutorManager = nettyExecutorManager;
 
     }
 
@@ -221,6 +229,17 @@ public class MasterFailoverService {
                 LOGGER.info("TaskInstance failover begin kill the task related yarn job");
                 ProcessUtils.killYarnJob(taskExecutionContext);
             }
+            // kill worker task, When the master failover and worker failover happened in the same time,
+            // the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
+            // This can be improved if we can load all task when cache a workflowInstance in memory
+            try {
+                TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId());
+                Host workerHost = Host.of(taskInstance.getHost());
+                nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command());
+                LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost());
+            } catch (ExecuteException e) {
+                LOGGER.error("Kill task failed", e);
+            }
         } else {
             LOGGER.info("The failover taskInstance is a master task");
         }

+ 5 - 1
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -82,6 +83,9 @@ public class FailoverServiceTest {
     @Mock
     private ProcessInstanceExecCacheManager cacheManager;
 
+    @Mock
+    private NettyExecutorManager nettyExecutorManager;
+
     private static int masterPort = 5678;
     private static int workerPort = 1234;
 
@@ -100,7 +104,7 @@ public class FailoverServiceTest {
 
         given(masterConfig.getListenPort()).willReturn(masterPort);
         MasterFailoverService masterFailoverService =
-            new MasterFailoverService(registryClient, masterConfig, processService);
+            new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager);
         WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
             masterConfig,
             processService,

+ 7 - 15
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java

@@ -21,9 +21,16 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 /**
  *  kill task request command
  */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
 public class TaskKillRequestCommand implements Serializable {
 
     /**
@@ -31,14 +38,6 @@ public class TaskKillRequestCommand implements Serializable {
      */
     private int taskInstanceId;
 
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
     /**
      *  package request command
      *
@@ -51,11 +50,4 @@ public class TaskKillRequestCommand implements Serializable {
         command.setBody(body);
         return command;
     }
-
-    @Override
-    public String toString() {
-        return "TaskKillRequestCommand{"
-                + "taskInstanceId=" + taskInstanceId
-                + '}';
-    }
 }