|
@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
|
|
|
import org.apache.dolphinscheduler.common.enums.Flag;
|
|
|
import org.apache.dolphinscheduler.common.enums.ResourceType;
|
|
|
import org.apache.dolphinscheduler.common.enums.TaskDependType;
|
|
|
-import org.apache.dolphinscheduler.common.enums.TaskType;
|
|
|
import org.apache.dolphinscheduler.common.enums.WarningType;
|
|
|
import org.apache.dolphinscheduler.common.model.DateInterval;
|
|
|
import org.apache.dolphinscheduler.common.model.TaskNode;
|
|
@@ -263,7 +262,6 @@ public class ProcessService {
|
|
|
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
|
|
|
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
|
|
|
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
|
|
|
- cmdTypeMap.put(CommandType.RESUME_FROM_FORCED_SUCCESS, 1);
|
|
|
CommandType commandType = command.getCommandType();
|
|
|
|
|
|
if (cmdTypeMap.containsKey(commandType)) {
|
|
@@ -710,6 +708,7 @@ public class ProcessService {
|
|
|
List<Integer> failedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
|
|
|
List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE);
|
|
|
List<Integer> killedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
|
|
|
+
|
|
|
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
|
|
|
|
|
|
failedList.addAll(killedList);
|
|
@@ -774,31 +773,6 @@ public class ProcessService {
|
|
|
break;
|
|
|
case SCHEDULER:
|
|
|
break;
|
|
|
- case RESUME_FROM_FORCED_SUCCESS:
|
|
|
- List<Integer> failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(),
|
|
|
- new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
|
|
|
- TaskType.SUB_PROCESS);
|
|
|
-
|
|
|
- for (int i = 0; i < failedSubList.size(); i++) {
|
|
|
-
|
|
|
- if (haveForcedSuccessInSubProcess(failedSubList.get(i))) {
|
|
|
-
|
|
|
- TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i));
|
|
|
- taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
|
|
|
- updateTaskInstance(taskInstance);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- * set resume node list to null
|
|
|
- * 1. we can have a complete dag in the ExecThread so that it can restore the previous context
|
|
|
- * 2. each time the operation is done the state of process will be reasonable as usual
|
|
|
- */
|
|
|
- cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
|
|
|
- cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
|
|
|
- String.join(Constants.COMMA, convertIntListToString(null)));
|
|
|
- processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
|
|
|
- processInstance.setRunTimes(runTime + 1);
|
|
|
- break;
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
@@ -806,30 +780,6 @@ public class ProcessService {
|
|
|
return processInstance;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- * recursively check if a sub process node contains forced success node
|
|
|
- * @param taskInstanceId task instance id
|
|
|
- * @return true or false
|
|
|
- */
|
|
|
- public boolean haveForcedSuccessInSubProcess(int taskInstanceId) {
|
|
|
- List<Integer> forcedSuccessList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId,
|
|
|
- new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS},
|
|
|
- null);
|
|
|
- if (forcedSuccessList != null && !forcedSuccessList.isEmpty()) {
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- List<Integer> childSubList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId,
|
|
|
- new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
|
|
|
- TaskType.SUB_PROCESS);
|
|
|
- for (Integer child : childSubList) {
|
|
|
- if (haveForcedSuccessInSubProcess(child)) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
* return complement data if the process start with complement data
|
|
|
*
|
|
@@ -1413,40 +1363,6 @@ public class ProcessService {
|
|
|
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- * get id list by task state and type
|
|
|
- * @param processInstanceId process instance id
|
|
|
- * @param states task instance state array
|
|
|
- * @param taskType task type
|
|
|
- * @return task instance id list
|
|
|
- */
|
|
|
- public List<Integer> findTaskIdByInstanceStatusAndType(int processInstanceId, ExecutionStatus[] states, TaskType taskType) {
|
|
|
- int[] statesArray = new int[states.length];
|
|
|
- for (int i = 0; i < states.length; i++) {
|
|
|
- statesArray[i] = states[i].ordinal();
|
|
|
- }
|
|
|
- return taskInstanceMapper.queryTaskByProcessIdAndStateAndType(processInstanceId, statesArray, taskType.toString());
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- * get tasks in sub_process by sub_process task id and state and type
|
|
|
- * if param type is null, it queries all types
|
|
|
- * @param taskId task instance id
|
|
|
- * @param states task instance state array
|
|
|
- * @param taskType task type
|
|
|
- * @return task instance id list
|
|
|
- */
|
|
|
- public List<Integer> findTaskIdInSubProcessByStatusAndType(int taskId, ExecutionStatus[] states, TaskType taskType) {
|
|
|
- int[] statesArray = new int[states.length];
|
|
|
- for (int i = 0; i < states.length; i++) {
|
|
|
- statesArray[i] = states[i].ordinal();
|
|
|
- }
|
|
|
- if (taskType == null) {
|
|
|
- return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStateAndType(taskId, statesArray, null);
|
|
|
- }
|
|
|
- return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStateAndType(taskId, statesArray, taskType.toString());
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
* find valid task list by process definition id
|
|
|
*
|