Quellcode durchsuchen

fix bug#1336 serial complement data can have multiple process instances (#3317)

Co-authored-by: lenboo <baoliang@analysys.com.cn>
bao liang vor 4 Jahren
Ursprung
Commit
624f0aeab9

+ 6 - 12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@@ -252,6 +252,9 @@ public class MasterExecThread implements Runnable {
         }
 
         while(Stopper.isRunning()){
+
+            logger.info("process {} start to complement {} data",
+                    processInstance.getId(), DateUtils.dateToString(scheduleDate));
             // prepare dag and other info
             prepareProcess();
 
@@ -266,13 +269,13 @@ public class MasterExecThread implements Runnable {
             // execute process ,waiting for end
             runProcess();
 
+            endProcess();
             // process instance failure ,no more complements
             if(!processInstance.getState().typeIsSuccess()){
                 logger.info("process {} state {}, complement not completely!",
                         processInstance.getId(), processInstance.getState());
                 break;
             }
-
             //  current process instance success ,next execute
             if(null == iterator){
                 // loop by day
@@ -291,9 +294,7 @@ public class MasterExecThread implements Runnable {
                 }
                 scheduleDate = iterator.next();
             }
-
-            logger.info("process {} start to complement {} data",
-                    processInstance.getId(), DateUtils.dateToString(scheduleDate));
+            // flow end
             // execute next process instance complement data
             processInstance.setScheduleTime(scheduleDate);
             if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
@@ -301,22 +302,15 @@ public class MasterExecThread implements Runnable {
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
             }
 
-            List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
-            for(TaskInstance taskInstance : taskInstanceList){
-                taskInstance.setFlag(Flag.NO);
-                processService.updateTaskInstance(taskInstance);
-            }
             processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
             processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
                     processInstance.getProcessDefinition().getGlobalParamMap(),
                     processInstance.getProcessDefinition().getGlobalParamList(),
                     CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
-
+            processInstance.setId(0);
             processService.saveProcessInstance(processInstance);
         }
 
-        // flow end
-        endProcess();
 
     }