Przeglądaj źródła

[Improvement][MasterServer] process complement data optimization #7925 (#7976)

* [Improvement][MasterServer] process complement data optimization #7925

* [Improvement][MasterServer] process complement data optimization #7925

Co-authored-by: shangeyao <sgy960921>
Assert 3 lat temu
rodzic
commit
12d7e6261e

+ 30 - 18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -609,28 +609,40 @@ public class WorkflowExecuteThread {
                 processInstance.getScheduleTime(),
                 complementListDate.toString());
             scheduleDate = complementListDate.get(index + 1);
-            //the next process complement
-            processInstance.setId(0);
         }
-        processInstance.setScheduleTime(scheduleDate);
+        //the next process complement
+        int create = this.createComplementDataCommand(scheduleDate);
+        if (create > 0) {
+            logger.info("create complement data command successfully.");
+        }
+        return true;
+    }
+
+    private int createComplementDataCommand(Date scheduleDate) {
+        Command command = new Command();
+        command.setScheduleTime(scheduleDate);
+        command.setCommandType(CommandType.COMPLEMENT_DATA);
+        command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
         Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
         if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
             cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
-            processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
-        }
-
-        processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-            processDefinition.getGlobalParamMap(),
-            processDefinition.getGlobalParamList(),
-            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
-        processInstance.setStartTime(new Date());
-        processInstance.setRestartTime(processInstance.getStartTime());
-        processInstance.setEndTime(null);
-        processService.saveProcessInstance(processInstance);
-        this.taskInstanceMap.clear();
-        startProcess();
-        return true;
+        }
+        cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss"));
+        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+        command.setTaskDependType(processInstance.getTaskDependType());
+        command.setFailureStrategy(processInstance.getFailureStrategy());
+        command.setWarningType(processInstance.getWarningType());
+        command.setWarningGroupId(processInstance.getWarningGroupId());
+        command.setStartTime(new Date());
+        command.setExecutorId(processInstance.getExecutorId());
+        command.setUpdateTime(new Date());
+        command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+        command.setWorkerGroup(processInstance.getWorkerGroup());
+        command.setEnvironmentCode(processInstance.getEnvironmentCode());
+        command.setDryRun(processInstance.getDryRun());
+        command.setProcessInstanceId(0);
+        command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+        return processService.createCommand(command);
     }
 
     private boolean needComplementProcess() {