Browse Source

RECOVER_TOLERANCE_FAULT_PROCESS CommandType needs the start parameters (#13958)

Drake Youngkun Min 1 year ago
parent
commit
801216bd35

+ 16 - 4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_EMPTY_SUB_PROCESS;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
@@ -770,8 +771,9 @@ public class ProcessServiceImpl implements ProcessService {
         }
 
         CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
-        // reset global params while repeat running is needed by cmdParam
-        if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+        // reset global params while repeat running and recover tolerance fault process is needed by cmdParam
+        if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
+                commandTypeIfComplement == CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {
             setGlobalParamIfCommanded(processDefinition, cmdParam);
         }
 
@@ -1590,8 +1592,7 @@ public class ProcessServiceImpl implements ProcessService {
         cmd.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
         cmd.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
         cmd.setProcessInstanceId(processInstance.getId());
-        cmd.setCommandParam(
-                String.format("{\"%s\":%d}", CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
+        cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
         cmd.setExecutorId(processInstance.getExecutorId());
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
         cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
@@ -2605,4 +2606,15 @@ public class ProcessServiceImpl implements ProcessService {
         triggerRelationService.saveCommandTrigger(commandId, processInstanceId);
     }
 
+    private Map<String, Object> createCommandParams(ProcessInstance processInstance) {
+        Map<String, Object> commandMap =
+                JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
+                });
+        Map<String, Object> recoverFailoverCommandParams = new HashMap<>();
+        Optional.ofNullable(MapUtils.getObject(commandMap, CMD_PARAM_START_PARAMS))
+                .ifPresent(startParams -> recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
+        recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId());
+        return recoverFailoverCommandParams;
+    }
+
 }

+ 25 - 0
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@@ -370,6 +370,31 @@ public class ProcessServiceTest {
         Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
         ProcessInstance processInstance10 = processService.handleCommand(host, command9);
         Assertions.assertNotNull(processInstance10);
+
+        // build command same as processService.processNeedFailoverProcessInstances(processInstance);
+        Command command12 = new Command();
+        command12.setId(12);
+        command12.setProcessDefinitionCode(definitionCode);
+        command12.setProcessDefinitionVersion(definitionVersion);
+        command12.setProcessInstanceId(processInstanceId);
+        command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
+        HashMap<String, String> startParams12 = new HashMap<>();
+        startParams12.put("startParam11", "testStartParam11");
+        HashMap<String, String> commandParams12 = new HashMap<>();
+        commandParams12.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams12));
+        commandParams12.put("ProcessInstanceId", "222");
+        command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
+        Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
+        Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
+        Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
+                processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam11\"");
+        ProcessInstance processInstance13 = processService.handleCommand(host, command12);
+        Assertions.assertNotNull(processInstance13);
+        Assertions.assertNotNull(processInstance13.getGlobalParams());
+        Assertions.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\""));
     }
 
     @Test