Browse Source

[Bug] [dolphinscheduler-api]support workflow instance to definition (#7930)

* add task save and  binds workflow

* add task update with upstream

* support workflow instance to definition

* fix ut
JinYong Li 3 years ago
parent
commit
24a97fda75

+ 6 - 7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@@ -177,13 +177,13 @@ public class ProcessInstanceController extends BaseController {
     @ApiImplicitParams({
         @ApiImplicitParam(name = "taskRelationJson", value = "TASK_RELATION_JSON", type = "String"),
         @ApiImplicitParam(name = "taskDefinitionJson", value = "TASK_DEFINITION_JSON", type = "String"),
-        @ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
+        @ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "1"),
         @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type = "String"),
-        @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean"),
-        @ApiImplicitParam(name = "globalParams", value = "PROCESS_GLOBAL_PARAMS", type = "String"),
+        @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean", example = "false"),
+        @ApiImplicitParam(name = "globalParams", value = "PROCESS_GLOBAL_PARAMS", type = "String", example = "[]"),
         @ApiImplicitParam(name = "locations", value = "PROCESS_INSTANCE_LOCATIONS", type = "String"),
-        @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type = "String"),
-        @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type = "Int", example = "0")
+        @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type = "Int", example = "0"),
+        @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type = "String", example = "default")
     })
     @PutMapping(value = "/{id}")
     @ResponseStatus(HttpStatus.OK)
@@ -199,8 +199,7 @@ public class ProcessInstanceController extends BaseController {
                                         @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams,
                                         @RequestParam(value = "locations", required = false) String locations,
                                         @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
-                                        @RequestParam(value = "tenantCode", required = true) String tenantCode,
-                                        @RequestParam(value = "flag", required = false) Flag flag) {
+                                        @RequestParam(value = "tenantCode", required = true) String tenantCode) {
         Map<String, Object> result = processInstanceService.updateProcessInstance(loginUser, projectCode, id,
             taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine, globalParams, locations, timeout, tenantCode);
         return returnDataList(result);

+ 10 - 9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@@ -263,7 +263,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                 ProcessDefinition processDefinition,
                                                 List<TaskDefinitionLog> taskDefinitionLogs) {
         Map<String, Object> result = new HashMap<>();
-        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs);
+        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
         if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
             logger.info("The task has not changed, so skip");
         }
@@ -271,12 +271,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
         }
-        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
         if (insertVersion == 0) {
             putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
         }
-        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
+        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
+            insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
         if (insertResult == Constants.EXIT_CODE_SUCCESS) {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
@@ -590,7 +591,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                 ProcessDefinition processDefinitionDeepCopy,
                                                 List<TaskDefinitionLog> taskDefinitionLogs) {
         Map<String, Object> result = new HashMap<>();
-        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs);
+        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
         if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
             logger.info("The task has not changed, so skip");
         }
@@ -603,14 +604,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             insertVersion = processDefinitionDeepCopy.getVersion();
         } else {
             processDefinition.setUpdateTime(new Date());
-            insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
+            insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
         }
         if (insertVersion == 0) {
             putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
         int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
-            processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
+            processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
         if (insertResult == Constants.EXIT_CODE_SUCCESS) {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
@@ -748,7 +749,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         switch (releaseState) {
             case ONLINE:
-                List<ProcessTaskRelation> relationList = processService.findRelationByCode(projectCode, code);
+                List<ProcessTaskRelation> relationList = processService.findRelationByCode(code, processDefinition.getVersion());
                 if (CollectionUtils.isEmpty(relationList)) {
                     putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
                     return result;
@@ -1899,7 +1900,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
     private Map<String, Object> createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) {
         Map<String, Object> result = new HashMap<>();
-        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
         if (insertVersion == 0) {
             putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
@@ -2103,7 +2104,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         switch (releaseState) {
             case ONLINE:
-                List<ProcessTaskRelation> relationList = processService.findRelationByCode(projectCode, code);
+                List<ProcessTaskRelation> relationList = processService.findRelationByCode(code, processDefinition.getVersion());
                 if (CollectionUtils.isEmpty(relationList)) {
                     putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
                     return result;

+ 45 - 54
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@@ -470,64 +470,55 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
             return result;
         }
         setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout);
-        if (Boolean.TRUE.equals(syncDefine)) {
-            List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
-            if (taskDefinitionLogs.isEmpty()) {
-                putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+        List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+        if (taskDefinitionLogs.isEmpty()) {
+            putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+            return result;
+        }
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+            if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
+                putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
                 return result;
             }
-            for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
-                if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
-                    putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
-                    return result;
-                }
-            }
-            int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs);
-            if (saveTaskResult == Constants.DEFINITION_FAILURE) {
-                putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
-                throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
-            }
-            ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
-            List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-            //check workflow json is valid
-            result = processDefinitionService.checkProcessNodeList(taskRelationJson);
-            if (result.get(Constants.STATUS) != Status.SUCCESS) {
+        }
+        int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, syncDefine);
+        if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+            putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
+        }
+        ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+        List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+        //check workflow json is valid
+        result = processDefinitionService.checkProcessNodeList(taskRelationJson);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        int tenantId = -1;
+        if (!Constants.DEFAULT.equals(tenantCode)) {
+            Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+            if (tenant == null) {
+                putMsg(result, Status.TENANT_NOT_EXIST);
                 return result;
             }
-            int tenantId = -1;
-            if (!Constants.DEFAULT.equals(tenantCode)) {
-                Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
-                if (tenant == null) {
-                    putMsg(result, Status.TENANT_NOT_EXIST);
-                    return result;
-                }
-                tenantId = tenant.getId();
-            }
-            ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
-            processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId);
-            processDefinition.setUpdateTime(new Date());
-            int insertVersion;
-            if (processDefinition.equals(processDefinitionDeepCopy)) {
-                insertVersion = processDefinitionDeepCopy.getVersion();
-            } else {
-                processDefinition.setUpdateTime(new Date());
-                insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false);
-            }
-            if (insertVersion == 0) {
-                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
-                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            }
-            int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
-                processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
-            if (insertResult == Constants.EXIT_CODE_SUCCESS) {
-                putMsg(result, Status.SUCCESS);
-                result.put(Constants.DATA_LIST, processDefinition);
-            } else {
-                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
-                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            }
-            processInstance.setProcessDefinitionVersion(insertVersion);
+            tenantId = tenant.getId();
+        }
+        processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId);
+        processDefinition.setUpdateTime(new Date());
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, syncDefine, Boolean.FALSE);
+        if (insertVersion == 0) {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
+            processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine);
+        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+            putMsg(result, Status.SUCCESS);
+            result.put(Constants.DATA_LIST, processDefinition);
+        } else {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
+        processInstance.setProcessDefinitionVersion(insertVersion);
         int update = processService.updateProcessInstance(processInstance);
         if (update == 0) {
             putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
@@ -745,7 +736,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
             processInstance.getProcessDefinitionCode(),
             processInstance.getProcessDefinitionVersion()
         );
-        if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
+        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
             putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
             return result;
         }

+ 3 - 3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@@ -137,7 +137,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                 return result;
             }
         }
-        int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs);
+        int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, Boolean.TRUE);
         if (saveTaskResult == Constants.DEFINITION_FAILURE) {
             putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
@@ -230,13 +230,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                 processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
             }
             int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(),
-                processTaskRelationLogList, null);
+                processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE);
             if (insertResult != Constants.EXIT_CODE_SUCCESS) {
                 putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
                 throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
             }
         }
-        int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition));
+        int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE);
         if (saveTaskResult == Constants.DEFINITION_FAILURE) {
             putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);

+ 6 - 6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@@ -298,7 +298,7 @@ public class ProcessDefinitionServiceTest {
         processDefinitionList.add(definition);
         Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
         Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
-        Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2);
+        Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
         Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
                 loginUser, projectCode, "46", 1L);
         Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
@@ -330,7 +330,7 @@ public class ProcessDefinitionServiceTest {
         processDefinitionList.add(definition);
         Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
         Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
-        Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2);
+        Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
         Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode));
         putMsg(result, Status.SUCCESS);
 
@@ -442,7 +442,7 @@ public class ProcessDefinitionServiceTest {
         processTaskRelation.setProcessDefinitionCode(46L);
         processTaskRelation.setPostTaskCode(123L);
         processTaskRelationList.add(processTaskRelation);
-        Mockito.when(processService.findRelationByCode(projectCode, 46L)).thenReturn(processTaskRelationList);
+        Mockito.when(processService.findRelationByCode(46L, 1)).thenReturn(processTaskRelationList);
         Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
                 loginUser, projectCode, 46, ReleaseState.ONLINE);
         Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
@@ -692,10 +692,10 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(dataSourceMapper.queryDataSourceByNameAndUserId(userId, "mysql_1")).thenReturn(dataSource);
 
         long projectCode =  1001;
-        Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull())).thenReturn(2);
-        Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull())).thenReturn(1);
+        Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(2);
+        Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(1);
         Mockito.when(processService.saveTaskRelation(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.anyLong(),
-            Mockito.eq(1), Mockito.notNull(), Mockito.notNull())).thenReturn(0);
+            Mockito.eq(1), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(0);
 
         Map<String, Object> result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, mockMultipartFile);
 

+ 3 - 2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@@ -424,7 +424,7 @@ public class ProcessInstanceServiceTest {
         when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
         when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
         when(processService.updateProcessInstance(processInstance)).thenReturn(1);
-        when(processService.saveProcessDefine(loginUser, processDefinition, false)).thenReturn(1);
+        when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1);
         when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
         putMsg(result, Status.SUCCESS, projectCode);
         Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
@@ -435,8 +435,9 @@ public class ProcessInstanceServiceTest {
         when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
         putMsg(result, Status.SUCCESS, projectCode);
 
+        when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.FALSE, Boolean.FALSE)).thenReturn(1);
         Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, taskJson,"2020-02-21 00:00:00", false, "", "", 0, "root");
+            shellJson, taskJson,"2020-02-21 00:00:00", Boolean.FALSE, "", "", 0, "root");
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 

+ 1 - 1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@@ -97,7 +97,7 @@ public class TaskDefinitionServiceImplTest {
             + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
             + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
         List<TaskDefinitionLog> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
-        Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(1);
+        Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions, Boolean.TRUE)).thenReturn(1);
         Map<String, Object> relation = taskDefinitionService
             .createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
         Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));

+ 4 - 4
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@@ -478,7 +478,7 @@ CREATE TABLE `t_ds_task_definition` (
   `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
   `resource_ids` text COMMENT 'resource id, separated by comma',
   `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
-  `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
+  `task_group_priority` tinyint(4) DEFAULT '0' COMMENT 'task group priority',
   `create_time` datetime NOT NULL COMMENT 'create time',
   `update_time` datetime NOT NULL COMMENT 'update time',
   PRIMARY KEY (`id`,`code`)
@@ -511,7 +511,7 @@ CREATE TABLE `t_ds_task_definition_log` (
   `resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma',
   `operator` int(11) DEFAULT NULL COMMENT 'operator user id',
   `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
-  `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
+  `task_group_priority` tinyint(4) DEFAULT 0 COMMENT 'task group priority',
   `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
   `create_time` datetime NOT NULL COMMENT 'create time',
   `update_time` datetime NOT NULL COMMENT 'update time',
@@ -538,7 +538,7 @@ CREATE TABLE `t_ds_process_task_relation` (
   `create_time` datetime NOT NULL COMMENT 'create time',
   `update_time` datetime NOT NULL COMMENT 'update time',
   PRIMARY KEY (`id`),
-  KEY `idx_project_code_project_code` (`project_code`,`process_definition_code`)
+  KEY `idx_code` (`project_code`,`process_definition_code`)
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
 -- ----------------------------
@@ -562,7 +562,7 @@ CREATE TABLE `t_ds_process_task_relation_log` (
   `create_time` datetime NOT NULL COMMENT 'create time',
   `update_time` datetime NOT NULL COMMENT 'update time',
   PRIMARY KEY (`id`),
-  KEY `idx_project_code_project_code` (`project_code`,`process_definition_code`)
+  KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`)
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
 -- ----------------------------

+ 3 - 3
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql

@@ -16,11 +16,11 @@
 */
 
 ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT 'job custom parameters' AFTER `app_link`;
-ALTER TABLE `t_ds_process_task_relation` ADD INDEX `idx_project_code_process_definition_code` (`project_code`, `process_definition_code`) USING BTREE;
-ALTER TABLE `t_ds_process_task_relation_log` ADD INDEX `idx_project_code_process_definition_code` (`project_code`, `process_definition_code`) USING BTREE;
+ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`, `process_definition_code`) USING BTREE;
+ALTER TABLE `t_ds_process_task_relation_log` ADD KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) USING BTREE;
 
 ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE;
 alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
 alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
 alter table t_ds_task_definition add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
-alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
+alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT '0' COMMENT 'task group id' AFTER `task_group_id`;

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -716,7 +716,7 @@ public class WorkflowExecuteThread {
 
         List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
 
-        List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode());
+        List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
         List<TaskDefinitionLog> taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations);
         List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
         forbiddenTaskMap.clear();

+ 48 - 34
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -2254,7 +2254,7 @@ public class ProcessService {
         return StringUtils.join(resourceIds, ",");
     }
 
-    public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs) {
+    public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine) {
         Date now = new Date();
         List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
         List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
@@ -2299,13 +2299,21 @@ public class ProcessService {
                 newTaskDefinitionLogs.add(taskDefinitionToUpdate);
             } else {
                 insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
-                taskDefinitionToUpdate.setId(task.getId());
-                updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
+                if (Boolean.TRUE.equals(syncDefine)) {
+                    taskDefinitionToUpdate.setId(task.getId());
+                    updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
+                } else {
+                    updateResult++;
+                }
             }
         }
         if (!newTaskDefinitionLogs.isEmpty()) {
-            updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
             insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
+            if (Boolean.TRUE.equals(syncDefine)) {
+                updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
+            } else {
+                updateResult += newTaskDefinitionLogs.size();
+            }
         }
         return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
     }
@@ -2313,7 +2321,7 @@ public class ProcessService {
     /**
      * save processDefinition (including create or update processDefinition)
      */
-    public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean isFromProcessDefine) {
+    public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) {
         ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
         Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
         int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
@@ -2322,12 +2330,14 @@ public class ProcessService {
         processDefinitionLog.setOperator(operator.getId());
         processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
         int insertLog = processDefineLogMapper.insert(processDefinitionLog);
-        int result;
-        if (0 == processDefinition.getId()) {
-            result = processDefineMapper.insert(processDefinitionLog);
-        } else {
-            processDefinitionLog.setId(processDefinition.getId());
-            result = processDefineMapper.updateById(processDefinitionLog);
+        int result = 1;
+        if (Boolean.TRUE.equals(syncDefine)) {
+            if (0 == processDefinition.getId()) {
+                result = processDefineMapper.insert(processDefinitionLog);
+            } else {
+                processDefinitionLog.setId(processDefinition.getId());
+                result = processDefineMapper.updateById(processDefinitionLog);
+            }
         }
         return (insertLog & result) > 0 ? insertVersion : 0;
     }
@@ -2336,7 +2346,8 @@ public class ProcessService {
      * save task relations
      */
     public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
-                                List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
+                                List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs,
+                                Boolean syncDefine) {
         if (taskRelationList.isEmpty()) {
             return Constants.EXIT_CODE_SUCCESS;
         }
@@ -2365,19 +2376,22 @@ public class ProcessService {
             processTaskRelationLog.setOperator(operator.getId());
             processTaskRelationLog.setOperateTime(now);
         }
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
-        if (!processTaskRelationList.isEmpty()) {
-            Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
-            Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
-            boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
-            if (result) {
-                return Constants.EXIT_CODE_SUCCESS;
+        int insert = taskRelationList.size();
+        if (Boolean.TRUE.equals(syncDefine)) {
+            List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+            if (!processTaskRelationList.isEmpty()) {
+                Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+                Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+                boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
+                if (result) {
+                    return Constants.EXIT_CODE_SUCCESS;
+                }
+                processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
             }
-            processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
+            insert = processTaskRelationMapper.batchInsert(taskRelationList);
         }
-        int result = processTaskRelationMapper.batchInsert(taskRelationList);
         int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
-        return (result & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
+        return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
     }
 
     public boolean isTaskOnline(long taskCode) {
@@ -2400,14 +2414,15 @@ public class ProcessService {
 
     /**
      * Generate the DAG Graph based on the process definition id
+     * Use temporarily before refactoring taskNode
      *
      * @param processDefinition process definition
      * @return dag graph
      */
     public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
-        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
-        List<TaskNode> taskNodeList = transformTask(processTaskRelations, Lists.newArrayList());
-        ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations));
+        List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
+        List<TaskNode> taskNodeList = transformTask(taskRelations, Lists.newArrayList());
+        ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(taskRelations));
         // Generate concrete Dag to be executed
         return DagHelper.buildDagGraph(processDag);
     }
@@ -2416,12 +2431,10 @@ public class ProcessService {
      * generate DagData
      */
     public DagData genDagData(ProcessDefinition processDefinition) {
-        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
-        List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(processTaskRelations);
-        List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream()
-                .map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
-                .collect(Collectors.toList());
-        return new DagData(processDefinition, processTaskRelations, taskDefinitions);
+        List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
+        List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(taskRelations);
+        List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList());
+        return new DagData(processDefinition, taskRelations, taskDefinitions);
     }
 
     public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
@@ -2465,10 +2478,11 @@ public class ProcessService {
     }
 
     /**
-     * find process task relation list by projectCode and processDefinitionCode
+     * find process task relation list by process
      */
-    public List<ProcessTaskRelation> findRelationByCode(long projectCode, long processDefinitionCode) {
-        return processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+    public List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion) {
+        List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinitionCode, processDefinitionVersion);
+        return processTaskRelationLogList.stream().map(r -> (ProcessTaskRelation) r).collect(Collectors.toList());
     }
 
     /**

+ 4 - 4
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@@ -566,7 +566,7 @@ public class ProcessServiceTest {
         Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(), taskDefinition.getVersion())).thenReturn(taskDefinition);
         Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1);
         Mockito.when(taskDefinitionMapper.queryByCode(taskDefinition.getCode())).thenReturn(taskDefinition);
-        int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs);
+        int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs, Boolean.TRUE);
         Assert.assertEquals(0, result);
     }
 
@@ -579,7 +579,7 @@ public class ProcessServiceTest {
         processDefinition.setVersion(1);
         processDefinition.setCode(11L);
 
-        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        ProcessTaskRelationLog processTaskRelation = new ProcessTaskRelationLog();
         processTaskRelation.setName("def 1");
         processTaskRelation.setProcessDefinitionVersion(1);
         processTaskRelation.setProjectCode(1L);
@@ -588,7 +588,7 @@ public class ProcessServiceTest {
         processTaskRelation.setPreTaskCode(2L);
         processTaskRelation.setUpdateTime(new Date());
         processTaskRelation.setCreateTime(new Date());
-        List<ProcessTaskRelation> list = new ArrayList<>();
+        List<ProcessTaskRelationLog> list = new ArrayList<>();
         list.add(processTaskRelation);
 
         TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
@@ -616,7 +616,7 @@ public class ProcessServiceTest {
         taskDefinitionLogs.add(td2);
 
         Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
-        Mockito.when(processTaskRelationMapper.queryByProcessCode(Mockito.anyLong(), Mockito.anyLong())).thenReturn(list);
+        Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())).thenReturn(list);
 
         DAG<String, TaskNode, TaskNodeRelation> stringTaskNodeTaskNodeRelationDAG = processService.genDagGraph(processDefinition);
         Assert.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount());