Sfoglia il codice sorgente

[Cherry-pick-dev][API] fix relation api bug (#8207)

* pick 8111

* fix relation delete (#8190)

* pick 8190
JinYong Li 3 anni fa
parent
commit
65744d4624

+ 2 - 42
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java

@@ -101,46 +101,6 @@ public class ProcessTaskRelationController extends BaseController {
         return returnDataList(result);
     }
 
-    /**
-     * move task to other processDefinition
-     *
-     * @param loginUser login user info
-     * @param projectCode project code
-     * @param processDefinitionCode process definition code
-     * @param targetProcessDefinitionCode target process definition code
-     * @param taskCode the current task code (the post task code)
-     * @return move result code
-     */
-    @ApiOperation(value = "moveRelation", notes = "MOVE_TASK_TO_OTHER_PROCESS_DEFINITION_NOTES")
-    @ApiImplicitParams({
-        @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "targetProcessDefinitionCode", value = "TARGET_PROCESS_DEFINITION_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
-    })
-    @PostMapping(value = "/move")
-    @ResponseStatus(HttpStatus.OK)
-    @ApiException(MOVE_PROCESS_TASK_RELATION_ERROR)
-    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result moveTaskProcessRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                          @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
-                                          @RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode,
-                                          @RequestParam(name = "targetProcessDefinitionCode", required = true) long targetProcessDefinitionCode,
-                                          @RequestParam(name = "taskCode", required = true) long taskCode) {
-        Map<String, Object> result = new HashMap<>();
-        if (processDefinitionCode == 0L) {
-            putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode");
-        } else if (targetProcessDefinitionCode == 0L) {
-            putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
-        } else if (taskCode == 0L) {
-            putMsg(result, DATA_IS_NOT_VALID, "taskCode");
-        } else {
-            result = processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode,
-                targetProcessDefinitionCode, taskCode);
-        }
-        return returnDataList(result);
-    }
-
     /**
      * delete process task relation (delete task from workflow)
      *
@@ -179,7 +139,7 @@ public class ProcessTaskRelationController extends BaseController {
     @ApiOperation(value = "deleteUpstreamRelation", notes = "DELETE_UPSTREAM_RELATION_NOTES")
     @ApiImplicitParams({
         @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "3,4"),
+        @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "1,2"),
         @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
     })
     @DeleteMapping(value = "/{taskCode}/upstream")
@@ -205,7 +165,7 @@ public class ProcessTaskRelationController extends BaseController {
     @ApiOperation(value = "deleteDownstreamRelation", notes = "DELETE_DOWNSTREAM_RELATION_NOTES")
     @ApiImplicitParams({
         @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "3,4"),
+        @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "1,2"),
         @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
     })
     @DeleteMapping(value = "/{taskCode}/downstream")

+ 0 - 16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java

@@ -42,22 +42,6 @@ public interface ProcessTaskRelationService {
                                                   long preTaskCode,
                                                   long postTaskCode);
 
-    /**
-     * move task to other processDefinition
-     *
-     * @param loginUser login user info
-     * @param projectCode project code
-     * @param processDefinitionCode process definition code
-     * @param targetProcessDefinitionCode target process definition code
-     * @param taskCode the current task code (the post task code)
-     * @return move result code
-     */
-    Map<String, Object> moveTaskProcessRelation(User loginUser,
-                                                long projectCode,
-                                                long processDefinitionCode,
-                                                long targetProcessDefinitionCode,
-                                                long taskCode);
-
     /**
      * delete process task relation
      *

+ 33 - 19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@@ -600,29 +600,43 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
-        int insertVersion;
-        if (processDefinition.equals(processDefinitionDeepCopy)) {
-            insertVersion = processDefinitionDeepCopy.getVersion();
-            ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinition.getCode(), insertVersion);
-            processDefinitionLog.setOperator(loginUser.getId());
-            processDefinitionLog.setOperateTime(new Date());
-            int update = processDefinitionLogMapper.updateById(processDefinitionLog);
+        boolean isChange = false;
+        if (processDefinition.equals(processDefinitionDeepCopy) && saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
+            List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+            if (taskRelationList.size() == processTaskRelationLogList.size()) {
+                Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
+                    taskRelationList.stream().collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog));
+                for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+                    if (!processTaskRelationLog.equals(taskRelationLogMap.get(processTaskRelationLog.getPostTaskCode()))) {
+                        isChange = true;
+                        break;
+                    }
+                }
+            } else {
+                isChange = true;
+            }
         } else {
-            processDefinition.setUpdateTime(new Date());
-            insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
-        }
-        if (insertVersion == 0) {
-            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            isChange = true;
         }
-        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
-            processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
-        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+        if (isChange) {
+            processDefinition.setUpdateTime(new Date());
+            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+            if (insertVersion <= 0) {
+                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            }
+            int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
+                processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
+            if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+                putMsg(result, Status.SUCCESS);
+                result.put(Constants.DATA_LIST, processDefinition);
+            } else {
+                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            }
+        } else {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
-        } else {
-            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
         return result;
     }

+ 129 - 236
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
-
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
@@ -26,7 +24,6 @@ import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ConditionType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@@ -40,13 +37,14 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import org.apache.commons.collections.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -57,8 +55,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 
 /**
@@ -88,6 +84,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
     @Autowired
     private ProcessDefinitionMapper processDefinitionMapper;
 
+    @Autowired
+    private ProcessService processService;
+
     /**
      * create process task relation
      *
@@ -135,6 +134,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
                 }
             }
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         Date now = new Date();
         List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
         if (preTaskCode != 0L) {
@@ -186,108 +186,13 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         return processTaskRelationLog;
     }
 
-    /**
-     * move task to other processDefinition
-     *
-     * @param loginUser                   login user info
-     * @param projectCode                 project code
-     * @param processDefinitionCode       process definition code
-     * @param targetProcessDefinitionCode target process definition code
-     * @param taskCode                    the current task code (the post task code)
-     * @return move result code
-     */
-    @Transactional(rollbackFor = RuntimeException.class)
-    @Override
-    public Map<String, Object> moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) {
-        Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
+    private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+        if (insertVersion <= 0) {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode);
-        if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode);
-            return result;
-        }
-        if (processDefinition.getProjectCode() != projectCode) {
-            putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
-            return result;
-        }
-        List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
-        if (CollectionUtils.isNotEmpty(downstreamList)) {
-            Set<Long> postTaskCodes = downstreamList
-                .stream()
-                .map(ProcessTaskRelation::getPostTaskCode)
-                .collect(Collectors.toSet());
-            putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
-            return result;
-        }
-        List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode);
-        if (upstreamList.isEmpty()) {
-            putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode);
-            return result;
-        } else {
-            Set<Long> preTaskCodes = upstreamList
-                .stream()
-                .map(ProcessTaskRelation::getPreTaskCode)
-                .collect(Collectors.toSet());
-            if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) {
-                putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ","));
-                return result;
-            }
-        }
-        TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
-        if (null == taskDefinition) {
-            putMsg(result, Status.DATA_IS_NULL, "taskDefinition");
-            return result;
-        }
-        ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams());
-        if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) {
-            Set<Long> depProcessDefinitionCodes = new HashSet<>();
-            ObjectNode dependence = (ObjectNode) paramNode.get("dependence");
-            ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
-            for (int i = 0; i < dependTaskList.size(); i++) {
-                ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
-                ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
-                for (int j = 0; j < dependItemList.size(); j++) {
-                    ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
-                    long definitionCode = dependItem.get("definitionCode").asLong();
-                    depProcessDefinitionCodes.add(definitionCode);
-                }
-            }
-            if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) {
-                putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
-                return result;
-            }
-        }
-        if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
-            long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong();
-            if (targetProcessDefinitionCode == subProcessDefinitionCode) {
-                putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
-                return result;
-            }
-        }
-        Date now = new Date();
-        ProcessTaskRelation processTaskRelation = upstreamList.get(0);
-        ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation);
-        processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
-        processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
-        processTaskRelation.setUpdateTime(now);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
-        processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
-        processTaskRelationLog.setUpdateTime(now);
-        processTaskRelationLog.setOperator(loginUser.getId());
-        processTaskRelationLog.setOperateTime(now);
-        int update = processTaskRelationMapper.updateById(processTaskRelation);
-        int updateLog = processTaskRelationLogMapper.updateById(processTaskRelationLog);
-        if (update == 0 || updateLog == 0) {
-            putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR);
-            throw new ServiceException(Status.MOVE_PROCESS_TASK_RELATION_ERROR);
-        } else {
-            putMsg(result, Status.SUCCESS);
-        }
-        return result;
+        processDefinition.setVersion(insertVersion);
     }
 
     /**
@@ -322,28 +227,25 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
             return result;
         }
-        List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
-        if (CollectionUtils.isNotEmpty(downstreamList)) {
-            Set<Long> postTaskCodes = downstreamList
-                .stream()
-                .map(ProcessTaskRelation::getPostTaskCode)
-                .collect(Collectors.toSet());
-            putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        if (CollectionUtils.isEmpty(processTaskRelationList)) {
+            putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
             return result;
         }
-
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
-        int deleteRelation = processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-        int deleteRelationLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-        if (0 == deleteRelation || 0 == deleteRelationLog) {
-            putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-            throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+        List<Long> downstreamList = Lists.newArrayList();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+            if (processTaskRelation.getPreTaskCode() == taskCode) {
+                downstreamList.add(processTaskRelation.getPostTaskCode());
+            }
+            if (processTaskRelation.getPostTaskCode() == taskCode) {
+                processTaskRelationList.remove(processTaskRelation);
+            }
         }
+        if (CollectionUtils.isNotEmpty(downstreamList)) {
+            putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ","));
+            return result;
+        }
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
             || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
             || TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
@@ -357,6 +259,21 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         return result;
     }
 
+    private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
+                                List<ProcessTaskRelation> processTaskRelationList) {
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
+            processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
+        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+            putMsg(result, Status.SUCCESS);
+            result.put(Constants.DATA_LIST, processDefinition);
+        } else {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+    }
+
     /**
      * delete task upstream relation
      *
@@ -379,11 +296,42 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
             return result;
         }
-        Status status = deleteUpstreamRelation(loginUser.getId(), projectCode,
-            Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode);
-        if (status != Status.SUCCESS) {
-            putMsg(result, status);
+        List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+        if (CollectionUtils.isEmpty(upstreamList)) {
+            putMsg(result, Status.DATA_IS_NULL, "taskCode");
+            return result;
+        }
+
+        List<Long> preTaskCodeList = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList());
+        if (preTaskCodeList.contains(0L)) {
+            putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
+            return result;
+        }
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
+            return result;
         }
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
+        List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+            if (preTaskCodeList.size() > 1) {
+                if (preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
+                    preTaskCodeList.remove(processTaskRelation.getPreTaskCode());
+                    processTaskRelationWaitRemove.add(processTaskRelation);
+                }
+            } else {
+                if (processTaskRelation.getPostTaskCode() == taskCode) {
+                    processTaskRelation.setPreTaskVersion(0);
+                    processTaskRelation.setPreTaskCode(0L);
+                }
+            }
+            if (preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) {
+                processTaskRelationWaitRemove.add(processTaskRelation);
+            }
+        }
+        processTaskRelationList.removeAll(processTaskRelationWaitRemove);
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
 
@@ -409,26 +357,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
-        Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
-            processTaskRelationList.stream()
-                .map(ProcessTaskRelationLog::new)
-                .collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog));
-        Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
-        int delete = 0;
-        int deleteLog = 0;
-        for (long postTaskCode : postTaskCodesSet) {
-            ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode);
-            if (processTaskRelationLog != null) {
-                delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-            }
+        List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
+        if (CollectionUtils.isEmpty(downstreamList)) {
+            putMsg(result, Status.DATA_IS_NULL, "taskCode");
+            return result;
         }
-        if ((delete & deleteLog) == 0) {
-            throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-        } else {
-            putMsg(result, Status.SUCCESS);
+        List<Long> postTaskCodeList = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList());
+        if (postTaskCodeList.contains(0L)) {
+            putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
+            return result;
         }
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode());
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode());
+            return result;
+        }
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
+        processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode);
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
 
@@ -514,6 +460,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
      * @param postTaskCode          post task code
      * @return delete result code
      */
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -522,36 +469,49 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode);
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
+            return result;
+        }
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
         if (CollectionUtils.isEmpty(processTaskRelationList)) {
             putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
             return result;
         }
-        if (processTaskRelationList.size() > 1) {
-            putMsg(result, Status.DATA_IS_NOT_VALID, "processTaskRelationList");
-            return result;
+        Map<Long, List<ProcessTaskRelation>> taskRelationMap = new HashMap<>();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+            taskRelationMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
+                if (v == null) {
+                    v = new ArrayList<>();
+                }
+                v.add(processTaskRelation);
+                return v;
+            });
         }
-        ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0);
-        int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(),
-            0L, processTaskRelation.getPostTaskCode());
-
-        if (upstreamCount == 0) {
-            putMsg(result, Status.DATA_IS_NULL, "upstreamCount");
+        if (!taskRelationMap.containsKey(postTaskCode)) {
+            putMsg(result, Status.DATA_IS_NULL, "postTaskCode");
             return result;
         }
-        if (upstreamCount > 1) {
-            int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
-            if (delete == 0) {
-                putMsg(result, Status.DELETE_EDGE_ERROR);
+        if (taskRelationMap.get(postTaskCode).size() > 1) {
+            for (ProcessTaskRelation processTaskRelation : taskRelationMap.get(postTaskCode)) {
+                if (processTaskRelation.getPreTaskCode() == preTaskCode) {
+                    int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
+                    if (delete == 0) {
+                        putMsg(result, Status.DELETE_EDGE_ERROR);
+                        throw new ServiceException(Status.DELETE_EDGE_ERROR);
+                    }
+                    processTaskRelationList.remove(processTaskRelation);
+                }
             }
-            return result;
-        }
-        processTaskRelation.setPreTaskVersion(0);
-        processTaskRelation.setPreTaskCode(0L);
-        int update = processTaskRelationMapper.updateById(processTaskRelation);
-        if (update == 0) {
-            putMsg(result, Status.DELETE_EDGE_ERROR);
+        } else {
+            ProcessTaskRelation processTaskRelation = taskRelationMap.get(postTaskCode).get(0);
+            processTaskRelationList.remove(processTaskRelation);
+            processTaskRelation.setPreTaskVersion(0);
+            processTaskRelation.setPreTaskCode(0L);
+            processTaskRelationList.add(processTaskRelation);
         }
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
 
@@ -583,71 +543,4 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             }
         };
     }
-
-    /**
-     * delete upstream relation
-     *
-     * @param projectCode  project code
-     * @param preTaskCodes pre task codes
-     * @param taskCode     pre task code
-     * @return status
-     */
-    private Status deleteUpstreamRelation(int userId, long projectCode, Long[] preTaskCodes, long taskCode) {
-        List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
-        if (CollectionUtils.isEmpty(upstreamList)) {
-            return Status.SUCCESS;
-        }
-        List<ProcessTaskRelationLog> upstreamLogList = new ArrayList<>();
-        Date now = new Date();
-        for (ProcessTaskRelation processTaskRelation : upstreamList) {
-            ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-            processTaskRelationLog.setOperator(userId);
-            processTaskRelationLog.setOperateTime(now);
-            processTaskRelationLog.setUpdateTime(now);
-            upstreamLogList.add(processTaskRelationLog);
-        }
-        Map<Long, List<ProcessTaskRelationLog>> processTaskRelationListGroupByProcessDefinitionCode = upstreamLogList.stream()
-            .collect(Collectors.groupingBy(ProcessTaskRelationLog::getProcessDefinitionCode));
-        // count upstream relation group by process definition code
-        List<Map<String, Long>> countListGroupByProcessDefinitionCode = processTaskRelationMapper
-            .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode);
-
-        List<ProcessTaskRelationLog> deletes = new ArrayList<>();
-        List<ProcessTaskRelationLog> updates = new ArrayList<>();
-        for (Map<String, Long> codeCountMap : countListGroupByProcessDefinitionCode) {
-            long processDefinitionCode = codeCountMap.get("processDefinitionCode");
-            long countValue = codeCountMap.get("countValue");
-            List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
-            if (countValue <= processTaskRelationLogList.size()) {
-                ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0);
-                if (processTaskRelationLog.getPreTaskCode() != 0) {
-                    processTaskRelationLog.setPreTaskCode(0);
-                    processTaskRelationLog.setPreTaskVersion(0);
-                    updates.add(processTaskRelationLog);
-                }
-            }
-            if (!processTaskRelationLogList.isEmpty()) {
-                deletes.addAll(processTaskRelationLogList);
-            }
-        }
-        deletes.addAll(updates);
-        int delete = 0;
-        int deleteLog = 0;
-        for (ProcessTaskRelationLog processTaskRelationLog : deletes) {
-            delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-            deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-        }
-        if ((delete & deleteLog) == 0) {
-            throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-        } else {
-            if (!updates.isEmpty()) {
-                int insert = processTaskRelationMapper.batchInsert(updates);
-                int insertLog = processTaskRelationLogMapper.batchInsert(updates);
-                if ((insert & insertLog) == 0) {
-                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                }
-            }
-        }
-        return Status.SUCCESS;
-    }
 }

+ 59 - 37
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@@ -279,6 +279,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
      * @param projectCode project code
      * @param taskCode task code
      */
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -314,23 +315,35 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
             if (!taskRelationList.isEmpty()) {
                 int deleteRelation = 0;
-                int deleteRelationLog = 0;
                 for (ProcessTaskRelation processTaskRelation : taskRelationList) {
-                    ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-                    deleteRelation += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                    deleteRelationLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+                    deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId());
                 }
-                if ((deleteRelation & deleteRelationLog) == 0) {
+                if (deleteRelation == 0) {
                     throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
                 }
+                long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
+                updateProcessDefiniteVersion(loginUser, processDefinitionCode);
             }
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
+            throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
         }
         return result;
     }
 
+    private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) {
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
+        }
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+        if (insertVersion <= 0) {
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+        return insertVersion;
+    }
+
     /**
      * update task definition
      *
@@ -347,37 +360,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         if (version <= 0) {
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
-        if (!processTaskRelationList.isEmpty()) {
-            List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
-            int delete = 0;
-            int deleteLog = 0;
-            Date now = new Date();
-            for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
-                ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-                delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-                if (processTaskRelationLog.getPreTaskCode() == taskCode) {
-                    processTaskRelationLog.setPreTaskVersion(version);
-                }
-                if (processTaskRelationLog.getPostTaskCode() == taskCode) {
-                    processTaskRelationLog.setPostTaskVersion(version);
-                }
-                processTaskRelationLog.setOperator(loginUser.getId());
-                processTaskRelationLog.setOperateTime(now);
-                processTaskRelationLog.setUpdateTime(now);
-                processTaskRelationLogList.add(processTaskRelationLog);
-            }
-            if ((delete & deleteLog) == 0) {
-                throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-            } else {
-                int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
-                int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
-                if ((insertRelation & insertRelationLog) == 0) {
-                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                }
-            }
-        }
+        handleRelation(loginUser, taskCode, version);
         result.put(Constants.DATA_LIST, taskCode);
         putMsg(result, Status.SUCCESS);
         return result;
@@ -439,6 +422,41 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         return version;
     }
 
+    private void handleRelation(User loginUser, long taskCode, Integer version) {
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
+        if (!processTaskRelationList.isEmpty()) {
+            long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
+            int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
+            List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
+            int delete = 0;
+            Date now = new Date();
+            for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+                ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
+                delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
+                if (processTaskRelationLog.getPreTaskCode() == taskCode) {
+                    processTaskRelationLog.setPreTaskVersion(version);
+                }
+                if (processTaskRelationLog.getPostTaskCode() == taskCode) {
+                    processTaskRelationLog.setPostTaskVersion(version);
+                }
+                processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
+                processTaskRelationLog.setOperator(loginUser.getId());
+                processTaskRelationLog.setOperateTime(now);
+                processTaskRelationLog.setUpdateTime(now);
+                processTaskRelationLogList.add(processTaskRelationLog);
+            }
+            if (delete == 0) {
+                throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+            } else {
+                int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+                int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
+                if ((insertRelation & insertRelationLog) == 0) {
+                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+                }
+            }
+        }
+    }
+
     /**
      * update task definition and upstream
      *
@@ -476,6 +494,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST);
         }
         if (!processTaskRelationList.isEmpty()) {
+            long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
+            int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
             List<ProcessTaskRelationLog> relationLogs = new ArrayList<>();
             Date now = new Date();
             int delete = 0;
@@ -498,6 +518,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                         processTaskRelationLog.setPreTaskVersion(0);
                     }
                 }
+                processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
                 relationLogs.add(processTaskRelationLog);
             }
             if ((delete & deleteLog) == 0) {
@@ -560,6 +581,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         taskDefinitionUpdate.setId(taskDefinition.getId());
         int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate);
         if (switchVersion > 0) {
+            handleRelation(loginUser, taskCode, version);
             result.put(Constants.DATA_LIST, taskCode);
             putMsg(result, Status.SUCCESS);
         } else {
@@ -768,7 +790,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                     }
                 }
                 taskDefinition.setFlag(Flag.YES);
-                taskDefinitionLog.setFlag(Flag.NO);
+                taskDefinitionLog.setFlag(Flag.YES);
                 break;
             default:
                 putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);

+ 1 - 0
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@@ -727,6 +727,7 @@ public class ProcessDefinitionServiceTest {
         processDefinition.setTenantId(1);
         processDefinition.setDescription("");
         processDefinition.setCode(46L);
+        processDefinition.setVersion(1);
         return processDefinition;
     }
 

+ 45 - 93
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.collections.CollectionUtils;
 
@@ -88,6 +89,9 @@ public class ProcessTaskRelationServiceTest {
     @Mock
     private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
 
+    @Mock
+    private ProcessService processService;
+
     /**
      * get Mock Admin User
      *
@@ -285,48 +289,6 @@ public class ProcessTaskRelationServiceTest {
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 
-    @Test
-    public void testMoveTaskProcessRelation() {
-        long projectCode = 1L;
-        long processDefinitionCode = 1L;
-        long taskCode = 1L;
-
-        Project project = getProject(projectCode);
-        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
-
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
-
-        Map<String, Object> result = new HashMap<>();
-        putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
-        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
-        Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L)).thenReturn(Lists.newArrayList());
-        Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
-        List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
-        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
-        processTaskRelation.setProjectCode(projectCode);
-        processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelation.setPreTaskCode(0L);
-        processTaskRelation.setPreTaskVersion(0);
-        processTaskRelation.setPostTaskCode(taskCode);
-        processTaskRelation.setPostTaskVersion(1);
-        processTaskRelationList.add(processTaskRelation);
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelationLog.setPreTaskCode(0L);
-        processTaskRelationLog.setPreTaskVersion(0);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setPostTaskVersion(1);
-        Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList);
-        Mockito.when(processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation)).thenReturn(processTaskRelationLog);
-        Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
-        Mockito.when(processTaskRelationLogMapper.updateById(processTaskRelationLog)).thenReturn(1);
-        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-    }
-
     @Test
     public void testQueryDownstreamRelation() {
         long projectCode = 1L;
@@ -456,6 +418,9 @@ public class ProcessTaskRelationServiceTest {
         ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
         Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
         Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
+        ProcessDefinition processDefinition = getProcessDefinition();
+        Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(processDefinition);
+        Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
         Map<String, Object> result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode);
         Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
     }
@@ -472,38 +437,23 @@ public class ProcessTaskRelationServiceTest {
         loginUser.setUserType(UserType.GENERAL_USER);
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
+        List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(projectCode);
+        processTaskRelation.setProcessDefinitionCode(1L);
+        processTaskRelation.setPreTaskCode(0L);
+        processTaskRelation.setPreTaskVersion(0);
+        processTaskRelation.setPostTaskCode(taskCode);
+        processTaskRelation.setPostTaskVersion(1);
+        processTaskRelationList.add(processTaskRelation);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
-        Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(Lists.newArrayList());
-        List<Map<String, Long>> countListGroupByProcessDefinitionCode = new ArrayList<>();
-        countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
-            {
-                put("processDefinitionCode", 123L);
-                put("countValue", 2L);
-            }
-        });
-        countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
-            {
-                put("processDefinitionCode", 124L);
-                put("countValue", 1L);
-            }
-        });
-        countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
-            {
-                put("processDefinitionCode", 125L);
-                put("countValue", 3L);
-            }
-        });
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPreTaskCode(0L);
-        processTaskRelationLog.setPreTaskVersion(0);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setPostTaskVersion(2);
-        Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode);
-        Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        Map<String, Object> result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode);
-        Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
+        Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList);
+        Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition());
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+            1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
+        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 
     @Test
@@ -523,25 +473,24 @@ public class ProcessTaskRelationServiceTest {
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
         Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList());
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPreTaskCode(taskCode);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
         Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
         Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
         TaskDefinition taskDefinition = new TaskDefinition();
         taskDefinition.setTaskType(TaskType.CONDITIONS.getDesc());
         Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
-        Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)).thenReturn(1);
-        processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelationLog.setProcessDefinitionVersion(1);
-        Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode);
+        List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(projectCode);
+        processTaskRelation.setProcessDefinitionCode(1L);
+        processTaskRelation.setPreTaskCode(0L);
+        processTaskRelation.setPreTaskVersion(0);
+        processTaskRelation.setPostTaskCode(taskCode);
+        processTaskRelation.setPostTaskVersion(1);
+        processTaskRelationList.add(processTaskRelation);
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)).thenReturn(processTaskRelationList);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+            1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 
@@ -549,7 +498,7 @@ public class ProcessTaskRelationServiceTest {
     public void testDeleteEdge() {
         long projectCode = 1L;
         long processDefinitionCode = 3L;
-        long preTaskCode = 4L;
+        long preTaskCode = 0L;
         long postTaskCode = 5L;
         Project project = getProject(projectCode);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@@ -563,15 +512,18 @@ public class ProcessTaskRelationServiceTest {
         ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
         processTaskRelation.setProjectCode(projectCode);
         processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
+        processTaskRelation.setProcessDefinitionVersion(1);
         processTaskRelation.setPreTaskCode(preTaskCode);
         processTaskRelation.setPostTaskCode(postTaskCode);
+        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
+        processTaskRelationLog.setOperator(loginUser.getId());
         List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
         processTaskRelationList.add(processTaskRelation);
-        Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList);
-        Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1);
-        Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1);
-        Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
-        result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+            1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 }

+ 31 - 2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -2250,7 +2250,7 @@ public class ProcessService {
         if (result > 0) {
             result = switchProcessTaskRelationVersion(processDefinitionLog);
             if (result <= 0) {
-                return Constants.DEFINITION_FAILURE;
+                return Constants.EXIT_CODE_FAILURE;
             }
         }
         return result;
@@ -2262,7 +2262,36 @@ public class ProcessService {
             processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
         }
         List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
-        return processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+        int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+        if (batchInsert == 0) {
+            return Constants.EXIT_CODE_FAILURE;
+        } else {
+            int result = 0;
+            for (ProcessTaskRelationLog taskRelationLog : processTaskRelationLogList) {
+                int switchResult = switchTaskDefinitionVersion(taskRelationLog.getPostTaskCode(), taskRelationLog.getPostTaskVersion());
+                if (switchResult != Constants.EXIT_CODE_FAILURE) {
+                    result++;
+                }
+            }
+            return result;
+        }
+    }
+
+    public int switchTaskDefinitionVersion(long taskCode, int taskVersion) {
+        TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
+        if (taskDefinition == null) {
+            return Constants.EXIT_CODE_FAILURE;
+        }
+        if (taskDefinition.getVersion() == taskVersion) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskVersion);
+        if (taskDefinitionUpdate == null) {
+            return Constants.EXIT_CODE_FAILURE;
+        }
+        taskDefinitionUpdate.setUpdateTime(new Date());
+        taskDefinitionUpdate.setId(taskDefinition.getId());
+        return taskDefinitionMapper.updateById(taskDefinitionUpdate);
     }
 
     /**