Browse Source

Fix task group cannot release when kill task (#13314)

Wenjun Ruan 2 years ago
parent
commit
52134277a3

+ 30 - 15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -376,23 +377,33 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
         TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
         if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
+            logger.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId());
             TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
             ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
             taskProcessor.action(TaskAction.DISPATCH);
             this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
                     TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
+            logger.info("Success force start taskGroupQueue: {}", taskGroupQueue.getId());
             return true;
         }
         if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
+            logger.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId());
             boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
             if (acquireTaskGroup) {
                 TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
                 ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
                 taskProcessor.action(TaskAction.DISPATCH);
+                logger.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId());
                 return true;
             }
+            logger.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId());
+            return false;
+        } else {
+            logger.info(
+                    "Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.",
+                    taskGroupQueue);
+            return true;
         }
-        return false;
     }
 
     public void processTimeout() {
@@ -464,7 +475,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      *
      */
     public void releaseTaskGroup(TaskInstance taskInstance) {
-        logger.info("Release task group");
         if (taskInstance.getTaskGroupId() > 0) {
             TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
             if (nextTaskInstance != null) {
@@ -1816,19 +1826,24 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             if (taskInstanceId == null || taskInstanceId.equals(0)) {
                 continue;
             }
-            TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
-            if (taskInstance == null || taskInstance.getState().isFinished()) {
-                continue;
-            }
-            taskProcessor.action(TaskAction.STOP);
-            if (taskProcessor.taskInstance().getState().isFinished()) {
-                TaskStateEvent taskStateEvent = TaskStateEvent.builder()
-                        .processInstanceId(processInstance.getId())
-                        .taskInstanceId(taskInstance.getId())
-                        .status(taskProcessor.taskInstance().getState())
-                        .type(StateEventType.TASK_STATE_CHANGE)
-                        .build();
-                this.addStateEvent(taskStateEvent);
+            LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId);
+            try {
+                TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
+                if (taskInstance == null || taskInstance.getState().isFinished()) {
+                    continue;
+                }
+                taskProcessor.action(TaskAction.STOP);
+                if (taskProcessor.taskInstance().getState().isFinished()) {
+                    TaskStateEvent taskStateEvent = TaskStateEvent.builder()
+                            .processInstanceId(processInstance.getId())
+                            .taskInstanceId(taskInstance.getId())
+                            .status(taskProcessor.taskInstance().getState())
+                            .type(StateEventType.TASK_STATE_CHANGE)
+                            .build();
+                    this.addStateEvent(taskStateEvent);
+                }
+            } finally {
+                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         }
     }

+ 6 - 4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@@ -146,11 +146,13 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     public boolean killTask() {
 
         try {
-            taskInstance = taskInstanceDao.findTaskInstanceById(taskInstance.getId());
+            logger.info("Begin to kill task: {}", taskInstance.getName());
             if (taskInstance == null) {
+                logger.warn("Kill task failed, the task instance is not exist");
                 return true;
             }
             if (taskInstance.getState().isFinished()) {
+                logger.warn("Kill task failed, the task instance is already finished");
                 return true;
             }
             // we don't wait the kill response
@@ -161,12 +163,12 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                 killRemoteTask();
             }
         } catch (Exception e) {
-            logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e);
+            logger.error("Master kill task: {} error, taskInstance id: {}", taskInstance.getName(),
+                    taskInstance.getId(), e);
             return false;
         }
 
-        logger.info("master success kill taskInstance name: {} taskInstance id: {}",
-                taskInstance.getName(), taskInstance.getId());
+        logger.info("Master success kill task: {}, taskInstanceId: {}", taskInstance.getName(), taskInstance.getId());
         return true;
     }
 

+ 7 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -2537,6 +2537,10 @@ public class ProcessServiceImpl implements ProcessService {
                     logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
                     return null;
                 }
+                if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE) {
+                    logger.info("The taskGroupQueue's status is in waiting, will not need to release task group");
+                    break;
+                }
             } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
                     && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
                             taskGroup.getUseSize(),
@@ -2563,7 +2567,8 @@ public class ProcessServiceImpl implements ProcessService {
         } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
                 Flag.YES.getCode(),
                 taskGroupQueue.getId()) != 1);
-        logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId());
+        logger.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}",
+                taskInstance.getTaskGroupId(), taskGroupQueue.getId());
         return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
     }
 
@@ -2577,6 +2582,7 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) {
         TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId);
+        taskGroupQueue.setInQueue(Flag.NO.getCode());
         taskGroupQueue.setStatus(status);
         taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis()));
         taskGroupQueueMapper.updateById(taskGroupQueue);