浏览代码

Solve transaction problems under multi-threading (#10917)

WangJPLeo 2 年之前
父节点
当前提交
2397423eb6

+ 5 - 12
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -321,18 +321,11 @@ public class ProcessServiceImpl implements ProcessService {
         //serial wait
         //when we get the running instance(or waiting instance) only get the priority instance(by id)
         if (processDefinition.getExecutionType().typeIsSerialWait()) {
-            while (true) {
-                List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
-                        processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
-                if (CollectionUtils.isEmpty(runningProcessInstances)) {
-                    processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
-                    saveProcessInstance(processInstance);
-                    return;
-                }
-                ProcessInstance runningProcess = runningProcessInstances.get(0);
-                if (this.processInstanceMapper.updateNextProcessIdById(processInstance.getId(), runningProcess.getId())) {
-                    return;
-                }
+            List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+                    processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+            if (CollectionUtils.isEmpty(runningProcessInstances)) {
+                processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                saveProcessInstance(processInstance);
             }
         } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
             List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),