Browse Source

[Fix-14149][master] task finished so not dispatch (#14161)

eye-gu 1 year ago
parent
commit
7ae4fb3787

+ 32 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -82,6 +82,7 @@ import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
 import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -122,6 +123,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.BeanUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -983,7 +985,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
                     }
                 }
                 // 4. submit to dispatch queue
-                taskExecuteRunnable.dispatch();
+                tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
 
                 stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance, taskInstance);
                 return true;
@@ -996,6 +998,35 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
         }
     }
 
+    /**
+     * Sometimes (such as pause), if the task instance status has already been finished,
+     * there is no need to dispatch it
+     */
+    @VisibleForTesting
+    void tryToDispatchTaskInstance(TaskInstance taskInstance, TaskExecuteRunnable taskExecuteRunnable) {
+        if (!taskInstance.getState().isFinished()) {
+            taskExecuteRunnable.dispatch();
+        } else {
+            if (workflowExecuteContext.getWorkflowInstance().isBlocked()) {
+                TaskStateEvent processBlockEvent = TaskStateEvent.builder()
+                        .processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
+                        .taskInstanceId(taskInstance.getId())
+                        .status(taskInstance.getState())
+                        .type(StateEventType.PROCESS_BLOCKED)
+                        .build();
+                this.stateEvents.add(processBlockEvent);
+            }
+
+            TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder()
+                    .processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
+                    .taskInstanceId(taskInstance.getId())
+                    .status(taskInstance.getState())
+                    .type(StateEventType.TASK_STATE_CHANGE)
+                    .build();
+            this.stateEvents.add(taskStateChangeEvent);
+        }
+    }
+
     /**
      * find task instance in db.
      * in case submit more than one same name task in the same time.

+ 18 - 0
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@@ -35,9 +35,11 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
 import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.command.CommandService;
@@ -371,4 +373,20 @@ public class WorkflowExecuteRunnableTest {
         return schedulerList;
     }
 
+    @Test
+    void testTryToDispatchTaskInstance() {
+        // task instance already finished, not dispatch
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setState(TaskExecutionStatus.PAUSE);
+        Mockito.when(processInstance.isBlocked()).thenReturn(true);
+        TaskExecuteRunnable taskExecuteRunnable = Mockito.mock(TaskExecuteRunnable.class);
+        workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
+        Mockito.verify(taskExecuteRunnable, Mockito.never()).dispatch();
+
+        // submit success should dispatch
+        taskInstance = new TaskInstance();
+        taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
+        workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
+        Mockito.verify(taskExecuteRunnable).dispatch();
+    }
 }