JinYong Li 3 年之前
父节点
当前提交
2aa191014d

+ 16 - 7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@@ -291,6 +291,18 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
             return result;
         }
+        List<Long> currentUpstreamList = upstreamList.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toList());
+        if (currentUpstreamList.contains(0L)) {
+            putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList");
+            return result;
+        }
+        List<Long> tmpCurrent = Lists.newArrayList(currentUpstreamList);
+        tmpCurrent.removeAll(preTaskCodeList);
+        preTaskCodeList.removeAll(currentUpstreamList);
+        if (!preTaskCodeList.isEmpty()) {
+            putMsg(result, Status.DATA_IS_NOT_VALID, StringUtils.join(preTaskCodeList, Constants.COMMA));
+            return result;
+        }
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
         if (processDefinition == null) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
@@ -300,20 +312,17 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
         List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList();
         for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
-            if (preTaskCodeList.size() > 1) {
-                if (preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
-                    preTaskCodeList.remove(processTaskRelation.getPreTaskCode());
+            if (currentUpstreamList.size() > 1) {
+                if (currentUpstreamList.contains(processTaskRelation.getPreTaskCode())) {
+                    currentUpstreamList.remove(processTaskRelation.getPreTaskCode());
                     processTaskRelationWaitRemove.add(processTaskRelation);
                 }
             } else {
-                if (processTaskRelation.getPostTaskCode() == taskCode) {
+                if (processTaskRelation.getPostTaskCode() == taskCode && (currentUpstreamList.isEmpty() || tmpCurrent.isEmpty())) {
                     processTaskRelation.setPreTaskVersion(0);
                     processTaskRelation.setPreTaskCode(0L);
                 }
             }
-            if (preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) {
-                processTaskRelationWaitRemove.add(processTaskRelation);
-            }
         }
         processTaskRelationList.removeAll(processTaskRelationWaitRemove);
         updateProcessDefiniteVersion(loginUser, result, processDefinition);