Browse Source

[Bug-9501][Worker] fix kill task error before running (#9509)

caishunfeng 3 years ago
parent
commit
66d148872d

+ 3 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.builder;
 
 import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
 
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -27,6 +26,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 
 /**
@@ -60,6 +61,7 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
         taskExecutionContext.setVarPool(taskInstance.getVarPool());
         taskExecutionContext.setDryRun(taskInstance.getDryRun());
+        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUBMITTED_SUCCESS);
         return this;
     }
 

+ 3 - 12
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@@ -85,17 +85,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     @Autowired
     private WorkerManagerThread workerManager;
 
-    /**
-     * Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache
-     *
-     * @param taskExecutionContext task
-     */
-    private void setTaskCache(TaskExecutionContext taskExecutionContext) {
-        TaskExecutionContext preTaskCache = new TaskExecutionContext();
-        preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        TaskExecutionContextCacheManager.cacheTaskExecutionContext(preTaskCache);
-    }
-
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
@@ -118,7 +107,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             return;
         }
 
-        setTaskCache(taskExecutionContext);
+        // set cache, it will be used when kill task
+        TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
         // todo custom logger
 
         taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));

+ 17 - 17
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@@ -86,16 +86,26 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         }
         logger.info("task kill command : {}", killCommand);
 
-        Pair<Boolean, List<String>> result = doKill(killCommand);
-
-        taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
-                new NettyRemoteChannel(channel, command.getOpaque()));
-
-        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+        int taskInstanceId = killCommand.getTaskInstanceId();
+        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
         if (taskExecutionContext == null) {
             logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
             return;
         }
+
+        Integer processId = taskExecutionContext.getProcessId();
+        if (processId.equals(0)) {
+            workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
+            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+            logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
+            return;
+        }
+
+        Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
+
+        taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
+                new NettyRemoteChannel(channel, command.getOpaque()));
+
         taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
         taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
 
@@ -110,21 +120,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      *
      * @return kill result
      */
-    private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) {
+    private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
         boolean processFlag = true;
         List<String> appIds = Collections.emptyList();
-        int taskInstanceId = killCommand.getTaskInstanceId();
-        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
 
         try {
-            Integer processId = taskExecutionContext.getProcessId();
-            if (processId.equals(0)) {
-                workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
-                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
-                logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
-                return Pair.of(true, appIds);
-            }
-
             String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());
             if (!StringUtils.isEmpty(pidsStr)) {
                 String cmd = String.format("kill -9 %s", pidsStr);

+ 6 - 10
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@@ -17,26 +17,24 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
-import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
 /**
  * Manage tasks
  */
@@ -105,9 +103,7 @@ public class WorkerManagerThread implements Runnable {
         if (taskExecutionContext == null) {
             return;
         }
-        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
-        responseCommand.setStatus(ExecutionStatus.KILL.getCode());
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
+        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
         taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
     }