Parcourir la source

[Fix-7538] [server] Fix when there is a forbidden node in dag, the execution flow is abnormal (#7613)

* when there is a forbidden node in dag, the execution flow is abnormal
Co-authored-by: hongjie.li <hongjie.li@dmall.com>
lhjzmn il y a 3 ans
Parent
commit
3af4d765c2

+ 104 - 87
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -216,19 +216,19 @@ public class WorkflowExecuteThread {
     /**
      * constructor of WorkflowExecuteThread
      *
-     * @param processInstance processInstance
-     * @param processService processService
-     * @param nettyExecutorManager nettyExecutorManager
-     * @param processAlertManager processAlertManager
-     * @param masterConfig masterConfig
+     * @param processInstance         processInstance
+     * @param processService          processService
+     * @param nettyExecutorManager    nettyExecutorManager
+     * @param processAlertManager     processAlertManager
+     * @param masterConfig            masterConfig
      * @param stateWheelExecuteThread stateWheelExecuteThread
      */
     public WorkflowExecuteThread(ProcessInstance processInstance
-            , ProcessService processService
-            , NettyExecutorManager nettyExecutorManager
-            , ProcessAlertManager processAlertManager
-            , MasterConfig masterConfig
-            , StateWheelExecuteThread stateWheelExecuteThread) {
+        , ProcessService processService
+        , NettyExecutorManager nettyExecutorManager
+        , ProcessAlertManager processAlertManager
+        , MasterConfig masterConfig
+        , StateWheelExecuteThread stateWheelExecuteThread) {
         this.processService = processService;
         this.processInstance = processInstance;
         this.masterConfig = masterConfig;
@@ -265,14 +265,14 @@ public class WorkflowExecuteThread {
 
     public String getKey() {
         if (StringUtils.isNotEmpty(key)
-                || this.processDefinition == null) {
+            || this.processDefinition == null) {
             return key;
         }
 
         key = String.format("%d_%d_%d",
-                this.processDefinition.getCode(),
-                this.processDefinition.getVersion(),
-                this.processInstance.getId());
+            this.processDefinition.getCode(),
+            this.processDefinition.getVersion(),
+            this.processInstance.getId());
         return key;
     }
 
@@ -400,7 +400,7 @@ public class WorkflowExecuteThread {
                     } else {
                         ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
                         this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
-                                org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+                            org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
                     }
                 }
             }
@@ -420,19 +420,19 @@ public class WorkflowExecuteThread {
 
     private void taskFinished(TaskInstance task) {
         logger.info("work flow {} task {} state:{} ",
-                processInstance.getId(),
-                task.getId(),
-                task.getState());
+            processInstance.getId(),
+            task.getId(),
+            task.getState());
         if (task.taskCanRetry()) {
             addTaskToStandByList(task);
             if (!task.retryTaskIntervalOverTime()) {
                 logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}",
-                        processInstance.getId(),
-                        task.getId(),
-                        task.getState(),
-                        task.getRetryTimes(),
-                        task.getMaxRetryTimes(),
-                        task.getRetryInterval());
+                    processInstance.getId(),
+                    task.getId(),
+                    task.getState(),
+                    task.getRetryTimes(),
+                    task.getMaxRetryTimes(),
+                    task.getRetryInterval());
                 stateWheelExecuteThread.addTask4TimeoutCheck(task);
                 stateWheelExecuteThread.addTask4RetryCheck(task);
             } else {
@@ -454,7 +454,7 @@ public class WorkflowExecuteThread {
             submitPostNode(Long.toString(task.getTaskCode()));
         } else if (task.getState().typeIsFailure()) {
             if (task.isConditionsTask()
-                    || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
+                || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
                 submitPostNode(Long.toString(task.getTaskCode()));
             } else {
                 errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
@@ -473,7 +473,7 @@ public class WorkflowExecuteThread {
         logger.info("process instance update: {}", processInstanceId);
         processInstance = processService.findProcessInstanceById(processInstanceId);
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                processInstance.getProcessDefinitionVersion());
+            processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
     }
 
@@ -502,8 +502,8 @@ public class WorkflowExecuteThread {
     public boolean checkProcessInstance(StateEvent stateEvent) {
         if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
             logger.error("mismatch process instance id: {}, state event:{}",
-                    this.processInstance.getId(),
-                    stateEvent);
+                this.processInstance.getId(),
+                stateEvent);
             return false;
         }
         return true;
@@ -603,9 +603,9 @@ public class WorkflowExecuteThread {
                 return true;
             }
             logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
-                    processInstance.getId(),
-                    processInstance.getScheduleTime(),
-                    complementListDate.toString());
+                processInstance.getId(),
+                processInstance.getScheduleTime(),
+                complementListDate.toString());
             scheduleDate = complementListDate.get(index + 1);
             //the next process complement
             processInstance.setId(0);
@@ -619,9 +619,9 @@ public class WorkflowExecuteThread {
 
         processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                processDefinition.getGlobalParamMap(),
-                processDefinition.getGlobalParamList(),
-                CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+            processDefinition.getGlobalParamMap(),
+            processDefinition.getGlobalParamList(),
+            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
         processInstance.setStartTime(new Date());
         processInstance.setEndTime(null);
         processService.saveProcessInstance(processInstance);
@@ -632,7 +632,7 @@ public class WorkflowExecuteThread {
 
     private boolean needComplementProcess() {
         if (processInstance.isComplementData()
-                && Flag.NO == processInstance.getIsSubProcess()) {
+            && Flag.NO == processInstance.getIsSubProcess()) {
             return true;
         }
         return false;
@@ -709,7 +709,7 @@ public class WorkflowExecuteThread {
             return;
         }
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                processInstance.getProcessDefinitionVersion());
+            processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
 
         List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
@@ -729,7 +729,7 @@ public class WorkflowExecuteThread {
         List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
         List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
         ProcessDag processDag = generateFlowDag(taskNodeList,
-                startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
+            startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
         if (processDag == null) {
             logger.error("processDag is null");
             return;
@@ -776,14 +776,14 @@ public class WorkflowExecuteThread {
                 if (complementListDate.size() == 0 && needComplementProcess()) {
                     complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
                     logger.info(" process definition code:{} complement data: {}",
-                            processInstance.getProcessDefinitionCode(), complementListDate.toString());
+                        processInstance.getProcessDefinitionCode(), complementListDate.toString());
 
                     if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) {
                         processInstance.setScheduleTime(complementListDate.get(0));
                         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                                processDefinition.getGlobalParamMap(),
-                                processDefinition.getGlobalParamList(),
-                                CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+                            processDefinition.getGlobalParamMap(),
+                            processDefinition.getGlobalParamList(),
+                            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
                         processService.updateProcessInstance(processInstance);
                     }
                 }
@@ -801,7 +801,7 @@ public class WorkflowExecuteThread {
         try {
             ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
             if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
-                    && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
+                && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
                 notifyProcessHostUpdate(taskInstance);
             }
             // package task instance before submit
@@ -810,8 +810,8 @@ public class WorkflowExecuteThread {
             boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger());
             if (!submit) {
                 logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
-                        processInstance.getId(), processInstance.getName(),
-                        taskInstance.getId(), taskInstance.getName());
+                    processInstance.getId(), processInstance.getName(),
+                    taskInstance.getId(), taskInstance.getName());
                 return null;
             }
             validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
@@ -857,7 +857,7 @@ public class WorkflowExecuteThread {
      * find task instance in db.
      * in case submit more than one same name task in the same time.
      *
-     * @param taskCode task code
+     * @param taskCode    task code
      * @param taskVersion task version
      * @return TaskInstance
      */
@@ -875,7 +875,7 @@ public class WorkflowExecuteThread {
      * encapsulation task
      *
      * @param processInstance process instance
-     * @param taskNode taskNode
+     * @param taskNode        taskNode
      * @return TaskInstance
      */
     private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
@@ -1083,34 +1083,51 @@ public class WorkflowExecuteThread {
             return DependResult.SUCCESS;
         }
         TaskNode taskNode = dag.getNode(taskCode);
-        List<String> depCodeList = taskNode.getDepList();
-        for (String depsNode : depCodeList) {
-            if (!dag.containsNode(depsNode)
-                    || forbiddenTaskMap.containsKey(depsNode)
-                    || skipTaskNodeMap.containsKey(depsNode)) {
-                continue;
-            }
-            // dependencies must be fully completed
-            if (!completeTaskMap.containsKey(depsNode)) {
-                return DependResult.WAITING;
-            }
-            Integer depsTaskId = completeTaskMap.get(depsNode);
-            ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
-            if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
-                return DependResult.NON_EXEC;
-            }
-            // ignore task state if current task is condition
-            if (taskNode.isConditionsTask()) {
-                continue;
-            }
-            if (!dependTaskSuccess(depsNode, taskCode)) {
-                return DependResult.FAILED;
+        List<String> indirectDepCodeList = new ArrayList<>();
+        setIndirectDepList(taskCode, indirectDepCodeList);
+        for (String depsNode : indirectDepCodeList) {
+            if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
+                // dependencies must be fully completed
+                if (!completeTaskMap.containsKey(depsNode)) {
+                    return DependResult.WAITING;
+                }
+                Integer depsTaskId = completeTaskMap.get(depsNode);
+                ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
+                if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
+                    return DependResult.NON_EXEC;
+                }
+                // ignore task state if current task is condition
+                if (taskNode.isConditionsTask()) {
+                    continue;
+                }
+                if (!dependTaskSuccess(depsNode, taskCode)) {
+                    return DependResult.FAILED;
+                }
             }
         }
         logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
         return DependResult.SUCCESS;
     }
 
+    /**
+     * This function is specially used to handle the dependency situation where the parent node is a prohibited node.
+     * When the parent node is a forbidden node, the dependency relationship should continue to be traced
+     *
+     * @param taskCode            taskCode
+     * @param indirectDepCodeList All indirectly dependent nodes
+     */
+    private void setIndirectDepList(String taskCode, List<String> indirectDepCodeList) {
+        TaskNode taskNode = dag.getNode(taskCode);
+        List<String> depCodeList = taskNode.getDepList();
+        for (String depsNode : depCodeList) {
+            if (forbiddenTaskMap.containsKey(depsNode)) {
+                setIndirectDepList(depsNode, indirectDepCodeList);
+            } else {
+                indirectDepCodeList.add(depsNode);
+            }
+        }
+    }
+
     /**
      * depend node is completed, but here need check the condition task branch is the next node
      */
@@ -1156,9 +1173,9 @@ public class WorkflowExecuteThread {
      */
     private ExecutionStatus runningState(ExecutionStatus state) {
         if (state == ExecutionStatus.READY_STOP
-                || state == ExecutionStatus.READY_PAUSE
-                || state == ExecutionStatus.WAITING_THREAD
-                || state == ExecutionStatus.DELAY_EXECUTION) {
+            || state == ExecutionStatus.READY_PAUSE
+            || state == ExecutionStatus.WAITING_THREAD
+            || state == ExecutionStatus.DELAY_EXECUTION) {
             // if the running task is not completed, the state remains unchanged
             return state;
         } else {
@@ -1224,8 +1241,8 @@ public class WorkflowExecuteThread {
 
         List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
         if (CollectionUtils.isNotEmpty(pauseList)
-                || !isComplementEnd()
-                || readyToSubmitTaskQueue.size() > 0) {
+            || !isComplementEnd()
+            || readyToSubmitTaskQueue.size() > 0) {
             return ExecutionStatus.PAUSE;
         } else {
             return ExecutionStatus.SUCCESS;
@@ -1264,8 +1281,8 @@ public class WorkflowExecuteThread {
             List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
             List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
             if (CollectionUtils.isNotEmpty(stopList)
-                    || CollectionUtils.isNotEmpty(killList)
-                    || !isComplementEnd()) {
+                || CollectionUtils.isNotEmpty(killList)
+                || !isComplementEnd()) {
                 return ExecutionStatus.STOP;
             } else {
                 return ExecutionStatus.SUCCESS;
@@ -1318,10 +1335,10 @@ public class WorkflowExecuteThread {
         ExecutionStatus state = getProcessInstanceState(processInstance);
         if (processInstance.getState() != state) {
             logger.info(
-                    "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
-                    processInstance.getId(), processInstance.getName(),
-                    processInstance.getState(), state,
-                    processInstance.getCommandType());
+                "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+                processInstance.getId(), processInstance.getName(),
+                processInstance.getState(), state,
+                processInstance.getCommandType());
 
             processInstance.setState(state);
             if (state.typeIsFinished()) {
@@ -1370,14 +1387,14 @@ public class WorkflowExecuteThread {
      */
     private void removeTaskFromStandbyList(TaskInstance taskInstance) {
         logger.info("remove task from stand by list, id: {} name:{}",
-                taskInstance.getId(),
-                taskInstance.getName());
+            taskInstance.getId(),
+            taskInstance.getName());
         try {
             readyToSubmitTaskQueue.remove(taskInstance);
         } catch (Exception e) {
             logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
-                    taskInstance.getId(),
-                    taskInstance.getName(), e);
+                taskInstance.getId(),
+                taskInstance.getName(), e);
         }
     }
 
@@ -1400,7 +1417,7 @@ public class WorkflowExecuteThread {
      */
     private void killAllTasks() {
         logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
-                activeTaskProcessorMaps.size());
+            activeTaskProcessorMaps.size());
         for (int taskId : activeTaskProcessorMaps.keySet()) {
             TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
             if (taskInstance == null || taskInstance.getState().typeIsFinished()) {
@@ -1567,10 +1584,10 @@ public class WorkflowExecuteThread {
     /**
      * generate flow dag
      *
-     * @param totalTaskNodeList total task node list
-     * @param startNodeNameList start node name list
+     * @param totalTaskNodeList    total task node list
+     * @param startNodeNameList    start node name list
      * @param recoveryNodeCodeList recovery node code list
-     * @param depNodeType depend node type
+     * @param depNodeType          depend node type
      * @return ProcessDag           process dag
      * @throws Exception exception
      */