Browse Source

[Feature][JsonSplit] update processDefinite from processInstance (#5325)

* update SnowFlake

* update processDefinite from processInstance

* update processDefinite from processInstance

Co-authored-by: JinyLeeChina <297062848@qq.com>
JinyLeeChina 4 years ago
parent
commit
d60f31bcf3

+ 2 - 2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@@ -205,7 +205,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
 
         int saveResult = processService.saveProcessDefinition(loginUser, project, processDefinitionName, desc,
-                locations, connects, processData, processDefinition);
+                locations, connects, processData, processDefinition, true);
 
         if (saveResult > 0) {
             putMsg(result, Status.SUCCESS);
@@ -414,7 +414,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
         int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
-                locations, connects, newProcessData, processDefinition);
+                locations, connects, newProcessData, processDefinition, true);
 
         if (saveResult > 0) {
             putMsg(result, Status.SUCCESS);

+ 13 - 26
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@@ -52,7 +52,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@@ -72,7 +71,6 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -425,12 +423,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
      * @param locations locations
      * @param connects connects
      * @return update result code
-     * @throws ParseException parse exception for json parse
      */
+    @Transactional
     @Override
     public Map<String, Object> updateProcessInstance(User loginUser, String projectName, Integer processInstanceId,
                                                      String processInstanceJson, String scheduleTime, Boolean syncDefine,
-                                                     Flag flag, String locations, String connects) throws ParseException {
+                                                     Flag flag, String locations, String connects) {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByName(projectName);
         //check project permission
@@ -461,10 +459,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         }
         Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
                 processDefinition.getUserId());
-        setProcessInstance(processInstance, tenant, scheduleTime, locations,
-                connects, processInstanceJson, processData);
+        setProcessInstance(processInstance, tenant, scheduleTime, processData);
         int updateDefine = 1;
         if (Boolean.TRUE.equals(syncDefine)) {
+            processDefinition.setId(processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()).getId());
             updateDefine = syncDefinition(loginUser, project, locations, connects,
                     processInstance, processDefinition, processData);
 
@@ -495,37 +493,29 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         processDefinition.setTimeout(processInstance.getTimeout());
         processDefinition.setUpdateTime(new Date());
 
-        int updateDefine = processService.saveProcessDefinition(loginUser, project, processDefinition.getName(),
+        return processService.saveProcessDefinition(loginUser, project, processDefinition.getName(),
                 processDefinition.getDescription(), locations, connects,
-                processData, processDefinition);
-        return updateDefine;
+                processData, processDefinition, false);
     }
 
     /**
      * update process instance attributes
-     *
-     * @return false if check failed or
      */
-    private void setProcessInstance(ProcessInstance processInstance, Tenant tenant,
-                                    String scheduleTime, String locations, String connects, String processInstanceJson,
-                                    ProcessData processData) {
+    private void setProcessInstance(ProcessInstance processInstance, Tenant tenant, String scheduleTime, ProcessData processData) {
 
         Date schedule = processInstance.getScheduleTime();
         if (scheduleTime != null) {
             schedule = DateUtils.getScheduleDate(scheduleTime);
         }
         processInstance.setScheduleTime(schedule);
-        processInstance.setLocations(locations);
-        processInstance.setConnects(connects);
-        if (StringUtils.isNotEmpty(processInstanceJson)) {
-            return;
-        }
         List<Property> globalParamList = processData.getGlobalParams();
-        Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
+        Map<String, String> globalParamMap = Optional.ofNullable(globalParamList)
+                .orElse(Collections.emptyList())
+                .stream()
+                .collect(Collectors.toMap(Property::getProp, Property::getValue));
         String globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
                 processInstance.getCmdTypeIfComplement(), schedule);
-        int timeout = processData.getTimeout();
-        processInstance.setTimeout(timeout);
+        processInstance.setTimeout(processData.getTimeout());
         if (tenant != null) {
             processInstance.setTenantCode(tenant.getTenantCode());
         }
@@ -706,13 +696,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
             throw new RuntimeException("workflow instance is null");
         }
 
-        ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+        ProcessDefinition processDefinition = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
                 processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion()
         );
-        ProcessDefinition processDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog),
-                ProcessDefinition.class);
-
         GanttDto ganttDto = new GanttDto();
         DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
         //topological sort

+ 2 - 1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@@ -456,7 +456,8 @@ public class ProcessDefinitionServiceTest {
                 , Mockito.anyString()
                 , Mockito.anyString()
                 , Mockito.any(ProcessData.class)
-                , Mockito.any(ProcessDefinition.class)))
+                , Mockito.any(ProcessDefinition.class)
+                ,true))
                 .thenReturn(1);
 
         Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());

+ 1 - 1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@@ -409,7 +409,7 @@ public class ProcessInstanceServiceTest {
         when(processDefineMapper.updateById(processDefinition)).thenReturn(1);
         when(processService.saveProcessDefinition(Mockito.any(), Mockito.any(),
                 Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
-                Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(1);
+                Mockito.anyString(), Mockito.any(), Mockito.any(), true)).thenReturn(1);
         putMsg(result, Status.SUCCESS, projectName);
 
         Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectName, 1,

+ 9 - 34
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -1586,32 +1586,6 @@ public class ProcessService {
         return processInstanceMapper.updateById(processInstance);
     }
 
-    /**
-     * update the process instance
-     *
-     * @param processInstanceId processInstanceId
-     * @param processJson processJson
-     * @param globalParams globalParams
-     * @param scheduleTime scheduleTime
-     * @param flag flag
-     * @param locations locations
-     * @param connects connects
-     * @return update process instance result
-     */
-    public int updateProcessInstance(Integer processInstanceId, String processJson,
-                                     String globalParams, Date scheduleTime, Flag flag,
-                                     String locations, String connects) {
-        ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
-        if (processInstance != null) {
-            processInstance.setGlobalParams(globalParams);
-            processInstance.setScheduleTime(scheduleTime);
-            processInstance.setLocations(locations);
-            processInstance.setConnects(connects);
-            return processInstanceMapper.updateById(processInstance);
-        }
-        return 0;
-    }
-
     /**
      * change task state
      *
@@ -2163,13 +2137,13 @@ public class ProcessService {
     /**
      * switch process definition version to process definition log version
      */
-    public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
+    public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog, Boolean isFromProcessDefine) {
         if (null == processDefinition || null == processDefinitionLog) {
             return Constants.DEFINITION_FAILURE;
         }
 
         processDefinitionLog.setId(processDefinition.getId());
-        processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
+        processDefinitionLog.setReleaseState(isFromProcessDefine ? ReleaseState.OFFLINE : ReleaseState.ONLINE);
         processDefinitionLog.setFlag(Flag.YES);
 
         int result;
@@ -2185,7 +2159,7 @@ public class ProcessService {
      * switch process definition version to process definition log version
      */
     public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
-        int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog);
+        int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog, true);
         if (switchResult != Constants.DEFINITION_FAILURE) {
             switchProcessTaskRelationVersion(processDefinition);
         }
@@ -2266,14 +2240,15 @@ public class ProcessService {
      * save processDefinition (including create or update processDefinition)
      */
     public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
-                                     String connects, ProcessData processData, ProcessDefinition processDefinition) {
+                                     String connects, ProcessData processData, ProcessDefinition processDefinition,
+                                     Boolean isFromProcessDefine) {
         ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
                 name, processData, project, desc, locations, connects);
-        Map<String, TaskDefinition> taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks());
+        Map<String, TaskDefinition> taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks(), isFromProcessDefine);
         if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) {
             return Constants.DEFINITION_FAILURE;
         }
-        return processDefinitionToDB(processDefinition, processDefinitionLog);
+        return processDefinitionToDB(processDefinition, processDefinitionLog, isFromProcessDefine);
     }
 
     /**
@@ -2319,7 +2294,7 @@ public class ProcessService {
     /**
      * handle task definition
      */
-    public Map<String, TaskDefinition> handleTaskDefinition(User operator, Long projectCode, List<TaskNode> taskNodes) {
+    public Map<String, TaskDefinition> handleTaskDefinition(User operator, Long projectCode, List<TaskNode> taskNodes, Boolean isFromProcessDefine) {
         if (taskNodes == null) {
             return null;
         }
@@ -2336,7 +2311,7 @@ public class ProcessService {
                 }
                 saveTaskDefinition(operator, projectCode, taskNode, taskDefinition);
             } else {
-                if (isTaskOnline(taskDefinition.getCode())) {
+                if (isFromProcessDefine && isTaskOnline(taskDefinition.getCode())) {
                     throw new ServiceException(String.format("The task %s is on line in process", taskNode.getName()));
                 }
                 updateTaskDefinition(operator, projectCode, taskNode, taskDefinition);

+ 1 - 1
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@@ -361,7 +361,7 @@ public class ProcessServiceTest {
         Mockito.when(processDefineMapper.updateById(any())).thenReturn(1);
         Mockito.when(processDefineLogMapper.insert(any())).thenReturn(1);
 
-        int i = processService.saveProcessDefinition(user, project, "name", "desc", "locations", "connects", processData, processDefinition);
+        int i = processService.saveProcessDefinition(user, project, "name", "desc", "locations", "connects", processData, processDefinition, true);
         Assert.assertEquals(1, i);
 
     }

+ 3 - 23
sql/dolphinscheduler_mysql.sql

@@ -455,7 +455,7 @@ CREATE TABLE `t_ds_task_definition` (
   `description` text COMMENT 'description',
   `project_code` bigint(20) NOT NULL COMMENT 'project code',
   `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
-  `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
+  `task_type` varchar(50) NOT NULL COMMENT 'task type',
   `task_params` text COMMENT 'job custom parameters',
   `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
   `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
@@ -484,7 +484,7 @@ CREATE TABLE `t_ds_task_definition_log` (
   `description` text COMMENT 'description',
   `project_code` bigint(20) NOT NULL COMMENT 'project code',
   `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
-  `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
+  `task_type` varchar(50) NOT NULL COMMENT 'task type',
   `task_params` text COMMENT 'job custom parameters',
   `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
   `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
@@ -546,32 +546,6 @@ CREATE TABLE `t_ds_process_task_relation_log` (
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
-DROP TABLE IF EXISTS `t_ds_process_definition_version`;
-CREATE TABLE `t_ds_process_definition_version` (
-  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
-  `process_definition_id` int(11) NOT NULL COMMENT 'process definition id',
-  `version` int(11) DEFAULT NULL COMMENT 'process definition version',
-  `process_definition_json` longtext COMMENT 'process definition json content',
-  `description` text,
-  `global_params` text COMMENT 'global parameters',
-  `locations` text COMMENT 'Node location information',
-  `connects` text COMMENT 'Node connection information',
-  `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
-  `create_time` datetime DEFAULT NULL COMMENT 'create time',
-  `timeout` int(11) DEFAULT '0' COMMENT 'time out',
-  `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource ids',
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `process_definition_id_and_version` (`process_definition_id`,`version`) USING BTREE,
-  KEY `process_definition_index` (`id`) USING BTREE
-) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-
-
 -- ----------------------------
 -- Table structure for t_ds_process_instance
 -- ----------------------------
@@ -814,7 +788,7 @@ DROP TABLE IF EXISTS `t_ds_task_instance`;
 CREATE TABLE `t_ds_task_instance` (
   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
   `name` varchar(255) DEFAULT NULL COMMENT 'task name',
-  `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
+  `task_type` varchar(50) NOT NULL COMMENT 'task type',
   `task_code` bigint(20) NOT NULL COMMENT 'task definition code',
   `task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
   `process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',

+ 0 - 25
sql/dolphinscheduler_postgre.sql

@@ -428,29 +428,6 @@ CREATE TABLE t_ds_process_task_relation_log (
   PRIMARY KEY (id)
 ) ;
 
---
---
-
-DROP TABLE IF EXISTS t_ds_process_definition_version;
-CREATE TABLE t_ds_process_definition_version (
-  id int NOT NULL  ,
-  process_definition_id int NOT NULL  ,
-  version int DEFAULT NULL ,
-  process_definition_json text ,
-  description text ,
-  global_params text ,
-  locations text ,
-  connects text ,
-  warning_group_id int4 DEFAULT NULL,
-  create_time timestamp DEFAULT NULL ,
-  timeout int DEFAULT '0' ,
-  resource_ids varchar(64),
-  PRIMARY KEY (id)
-) ;
-
-create index process_definition_id_and_version on t_ds_process_definition_version (process_definition_id,version);
-
 --
 -- Table structure for table t_ds_process_instance
 --
@@ -834,9 +811,6 @@ ALTER TABLE t_ds_process_task_relation ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds
 DROP SEQUENCE IF EXISTS t_ds_process_task_relation_log_id_sequence;
 CREATE SEQUENCE  t_ds_process_task_relation_log_id_sequence;
 ALTER TABLE t_ds_process_task_relation_log ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_task_relation_log_id_sequence');
-DROP SEQUENCE IF EXISTS t_ds_process_definition_version_id_sequence;
-CREATE SEQUENCE  t_ds_process_definition_version_id_sequence;
-ALTER TABLE t_ds_process_definition_version ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_definition_version_id_sequence');
 DROP SEQUENCE IF EXISTS t_ds_process_instance_id_sequence;
 CREATE SEQUENCE  t_ds_process_instance_id_sequence;
 ALTER TABLE t_ds_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_instance_id_sequence');