Selaa lähdekoodia

fix process pause and rerun (#9568)

caishunfeng 3 vuotta sitten
vanhempi
commit
63638601b0

+ 15 - 29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -84,7 +84,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -642,6 +641,18 @@ public class WorkflowExecuteThread {
         return null;
     }
 
+    public TaskInstance getTaskInstance(long taskCode) {
+        if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+            return null;
+        }
+        for (TaskInstance taskInstance : taskInstanceMap.values()) {
+            if (taskInstance.getTaskCode() == taskCode) {
+                return taskInstance;
+            }
+        }
+        return null;
+    }
+
     public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) {
         if (activeTaskProcessorMaps.containsKey(taskCode)) {
             return activeTaskProcessorMaps.get(taskCode).taskInstance();
@@ -1281,7 +1292,9 @@ public class WorkflowExecuteThread {
         List<TaskInstance> taskInstances = new ArrayList<>();
         for (String taskNode : submitTaskNodeList) {
             TaskNode taskNodeObject = dag.getNode(taskNode);
-            if (checkTaskInstanceByCode(taskNodeObject.getCode())) {
+            TaskInstance existTaskInstance = getTaskInstance(taskNodeObject.getCode());
+            if (existTaskInstance != null) {
+                taskInstances.add(existTaskInstance);
                 continue;
             }
             TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
@@ -1693,12 +1706,6 @@ public class WorkflowExecuteThread {
                 logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
                 return;
             }
-            // need to check if the tasks with same task code is active
-            boolean active = hadNotFailTask(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
-            if (active) {
-                logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode());
-                return;
-            }
             logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
                 taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
             readyToSubmitTaskQueue.put(taskInstance);
@@ -1950,25 +1957,4 @@ public class WorkflowExecuteThread {
         }
     }
 
-    /**
-     * check if had not fail task by taskCode and version
-     *
-     * @param taskCode
-     * @param version
-     * @return
-     */
-    private boolean hadNotFailTask(long taskCode, int version) {
-        boolean result = false;
-        for (Entry<Integer, TaskInstance> entry : taskInstanceMap.entrySet()) {
-            TaskInstance taskInstance = entry.getValue();
-            if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == version) {
-                if (!taskInstance.getState().typeIsFailure()) {
-                    result = true;
-                    break;
-                }
-            }
-        }
-        return result;
-    }
-
 }

+ 5 - 4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -899,7 +899,7 @@ public class ProcessServiceImpl implements ProcessService {
         } else {
             processInstance = this.findProcessInstanceDetailById(processInstanceId);
             if (processInstance == null) {
-                return processInstance;
+                return null;
             }
         }
         if (cmdParam != null) {
@@ -1482,12 +1482,13 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
         ExecutionStatus processInstanceState = processInstance.getState();
-        if (processInstanceState.typeIsFinished()
-            || processInstanceState == ExecutionStatus.READY_PAUSE
-            || processInstanceState == ExecutionStatus.READY_STOP) {
+        if (processInstanceState.typeIsFinished() || processInstanceState == ExecutionStatus.READY_STOP) {
             logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState);
             return null;
         }
+        if (processInstanceState == ExecutionStatus.READY_PAUSE) {
+            taskInstance.setState(ExecutionStatus.PAUSE);
+        }
         taskInstance.setExecutorId(processInstance.getExecutorId());
         taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
         taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));