Forráskód Böngészése

[Bug][Api] Failed to rerun the workflow instance (#9795)

hstdream 3 éve
szülő
commit
341e220bf0

+ 1 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@@ -241,7 +241,7 @@ public enum Status {
     PROCESS_INSTANCE_NOT_EXIST(50001, "process instance {0} does not exist", "工作流实例[{0}]不存在"),
     PROCESS_INSTANCE_EXIST(50002, "process instance {0} already exists", "工作流实例[{0}]已存在"),
     PROCESS_DEFINE_NOT_EXIST(50003, "process definition {0} does not exist", "工作流定义[{0}]不存在"),
-    PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} not on line", "工作流定义[{0}]不是上线状态"),
+    PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} process version {1} not on line", "工作流定义[{0}] 工作流版本[{1}]不是上线状态"),
     SUB_PROCESS_DEFINE_NOT_RELEASE(50004, "exist sub process definition not on line", "存在子工作流定义不是上线状态"),
     PROCESS_INSTANCE_ALREADY_CHANGED(50005, "the status of process instance {0} is already {1}", "工作流实例[{0}]的状态已经是[{1}]"),
     PROCESS_INSTANCE_STATE_OPERATION_ERROR(50006, "the status of process instance {0} is {1},Cannot perform {2} operation", "工作流实例[{0}]的状态是[{1}],无法执行[{2}]操作"),

+ 2 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@@ -73,9 +73,10 @@ public interface ExecutorService {
      * @param projectCode project code
      * @param processDefinition process definition
      * @param processDefineCode process definition code
+     * @param verison process definition version
      * @return check result code
      */
-    Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode);
+    Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer verison);
 
     /**
      * do action to process instance:pause, stop, repeat, recover from pause, recover from stop

+ 5 - 4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -149,7 +149,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
 
         // check process define release state
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
-        result = checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode);
+        result = checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode, processDefinition.getVersion());
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -208,17 +208,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @param projectCode project code
      * @param processDefinition process definition
      * @param processDefineCode process definition code
+     * @param version process instance verison
      * @return check result code
      */
     @Override
-    public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode) {
+    public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer version) {
         Map<String, Object> result = new HashMap<>();
         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
             // check process definition exists
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
         } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
             // check process definition online
-            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode));
+            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
         } else if (!checkSubProcessDefinitionValid(processDefinition)){
             // check sub process definition online
             putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
@@ -290,7 +291,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion());
         if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
-            result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode());
+            result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
             if (result.get(Constants.STATUS) != Status.SUCCESS) {
                 return result;
             }

+ 1 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@@ -150,7 +150,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
 
         // check work flow define release state
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
-        result = executorService.checkProcessDefinitionValid(projectCode,processDefinition, processDefineCode);
+        result = executorService.checkProcessDefinitionValid(projectCode,processDefinition, processDefineCode, processDefinition.getVersion());
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }