Browse Source

[Fix] while repeat running, the global parameters will not set to the user specified start parameters. (#4779)

* [fix]when repeat running, there are no startparams which was set in the first start

* [fix] cannot set global param while rerun process instance

* fix code format

* delete unrelated test

* Update ExecutorControllerTest.java

* fix code smell

* Update ExecutorService2Test.java

don't know why e2e failed

* replace duplicate code
WilliamChen-luckbob 4 years ago
parent
commit
e47e4a70ab

+ 24 - 9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
 import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
@@ -261,15 +262,22 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             putMsg(result, Status.TENANT_NOT_SUITABLE);
         }
 
+        //get the startParams user specified at the first starting while repeat running is needed
+        Map<String, Object> commandMap = JSONUtils.toMap(processInstance.getCommandParam(), String.class, Object.class);
+        String startParams = null;
+        if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
+            startParams = (commandMap.get(Constants.CMD_PARAM_START_PARAMS)).toString();
+        }
+
         switch (executeType) {
             case REPEAT_RUNNING:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING);
+                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING, startParams);
                 break;
             case RECOVER_SUSPENDED_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS);
+                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
                 break;
             case START_FAILURE_TASK_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS);
+                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
                 break;
             case STOP:
                 if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@@ -379,19 +387,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     /**
      * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution
      *
-     * @param loginUser login user
-     * @param instanceId instance id
+     * @param loginUser           login user
+     * @param instanceId          instance id
      * @param processDefinitionId process definition id
-     * @param commandType command type
+     * @param commandType         command type
      * @return insert result code
      */
-    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) {
+    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) {
         Map<String, Object> result = new HashMap<>();
+
+        //To add startParams only when repeat running is needed
+        Map<String, Object> cmdParam = new HashMap<>();
+        cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
+        if (StringUtils.isNotEmpty(startParams)) {
+            cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
+        }
+
         Command command = new Command();
         command.setCommandType(commandType);
         command.setProcessDefinitionId(processDefinitionId);
-        command.setCommandParam(String.format("{\"%s\":%d}",
-                CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId));
+        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
         command.setExecutorId(loginUser.getId());
 
         if (!processService.verifyIsNeedCreateCommand(command)) {

+ 10 - 1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
@@ -260,6 +261,14 @@ public class ExecutorService2Test {
 
     }
 
+    @Test
+    public void testExecuteRepeatRunning() throws Exception {
+        Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
+        
+        Map<String, Object> result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.REPEAT_RUNNING);
+        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+    }
+
     private List<Server> getMasterServersList() {
         List<Server> masterServerList = new ArrayList<>();
         Server masterServer1 = new Server();
@@ -295,4 +304,4 @@ public class ExecutorService2Test {
         result.put(Constants.STATUS, Status.SUCCESS);
         return result;
     }
-}
+}

+ 35 - 17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -563,23 +563,8 @@ public class ProcessService {
         processInstance.setLocations(processDefinition.getLocations());
         processInstance.setConnects(processDefinition.getConnects());
 
-        // get start params from command param
-        Map<String, String> startParamMap = null;
-        if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
-            String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
-            startParamMap = JSONUtils.toMap(startParamJson);
-        }
-
-        // set start param into global params
-        if (startParamMap != null && startParamMap.size() > 0
-                && processDefinition.getGlobalParamMap() != null) {
-            for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
-                String val = startParamMap.get(param.getKey());
-                if (val != null) {
-                    param.setValue(val);
-                }
-            }
-        }
+        // reset global params while there are start parameters
+        setGlobalParamIfCommanded(processDefinition,cmdParam);
 
         // curing global params
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
@@ -599,6 +584,26 @@ public class ProcessService {
         return processInstance;
     }
 
+    private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
+        // get start params from command param
+        Map<String, String> startParamMap = null;
+        if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
+            String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
+            startParamMap = JSONUtils.toMap(startParamJson);
+        }
+
+        // set start param into global params
+        if (startParamMap != null && startParamMap.size() > 0
+                && processDefinition.getGlobalParamMap() != null) {
+            for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
+                String val = startParamMap.get(param.getKey());
+                if (val != null) {
+                    param.setValue(val);
+                }
+            }
+        }
+    }
+
     /**
      * get process tenant
      * there is tenant id in definition, use the tenant of the definition.
@@ -690,7 +695,20 @@ public class ProcessService {
                 processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
             } else {
                 processInstance = this.findProcessInstanceDetailById(processInstanceId);
+                CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
+
+                // reset global params while repeat running is needed by cmdParam
+                if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+                    setGlobalParamIfCommanded(processDefinition, cmdParam);
+                }
+
                 // Recalculate global parameters after rerun.
+
+                processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+                    processDefinition.getGlobalParamMap(),
+                    processDefinition.getGlobalParamList(),
+                    commandTypeIfComplement,
+                    processInstance.getScheduleTime()));
             }
             processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
             processInstance.setProcessDefinition(processDefinition);