Browse Source

fix empty upstreamTaskCodes (#8405)

Co-authored-by: caishunfeng <534328519@qq.com>
caishunfeng 3 years ago
parent
commit
71fcea5937

+ 22 - 17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@@ -54,6 +54,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -150,11 +151,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     /**
      * create single task definition that binds the workflow
      *
-     * @param loginUser             login user
-     * @param projectCode           project code
+     * @param loginUser login user
+     * @param projectCode project code
      * @param processDefinitionCode process definition code
      * @param taskDefinitionJsonObj task definition json object
-     * @param upstreamCodes         upstream task codes, sep comma
+     * @param upstreamCodes upstream task codes, sep comma
      * @return create result code
      */
     @Transactional(rollbackFor = RuntimeException.class)
@@ -227,7 +228,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                 processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
             }
             int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(),
-                processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE);
+                    processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE);
             if (insertResult != Constants.EXIT_CODE_SUCCESS) {
                 putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
                 throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
@@ -272,6 +273,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     /**
      * delete task definition
      * Only offline and no downstream dependency can be deleted
+     *
      * @param loginUser login user
      * @param projectCode project code
      * @param taskCode task code
@@ -301,9 +303,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryDownstreamByTaskCode(taskCode);
         if (!processTaskRelationList.isEmpty()) {
             Set<Long> postTaskCodes = processTaskRelationList
-                .stream()
-                .map(ProcessTaskRelation::getPostTaskCode)
-                .collect(Collectors.toSet());
+                    .stream()
+                    .map(ProcessTaskRelation::getPostTaskCode)
+                    .collect(Collectors.toSet());
             putMsg(result, Status.TASK_HAS_DOWNSTREAM, StringUtils.join(postTaskCodes, ","));
             return result;
         }
@@ -326,7 +328,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     }
 
     private void updateDag(User loginUser, Map<String, Object> result, long processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
-                                List<TaskDefinitionLog> taskDefinitionLogs) {
+                           List<TaskDefinitionLog> taskDefinitionLogs) {
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
         if (processDefinition == null) {
             throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
@@ -337,7 +339,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         }
         List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
         int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
-            insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
+                insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
         if (insertResult == Constants.EXIT_CODE_SUCCESS) {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
@@ -433,11 +435,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     /**
      * update task definition and upstream
      *
-     * @param loginUser             login user
-     * @param projectCode           project code
-     * @param taskCode              task definition code
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param taskCode task definition code
      * @param taskDefinitionJsonObj task definition json object
-     * @param upstreamCodes         upstream task codes, sep comma
+     * @param upstreamCodes upstream task codes, sep comma
      * @return update result code
      */
     @Override
@@ -449,7 +451,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         }
         List<ProcessTaskRelation> upstreamTaskRelations = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
         Set<Long> upstreamCodeSet = upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
-        Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+        Set<Long> upstreamTaskCodes = Collections.emptySet();
+        if (StringUtils.isNotEmpty(upstreamCodes)) {
+            upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+        }
         if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) {
             putMsg(result, Status.SUCCESS);
             return result;
@@ -642,7 +647,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         }
         Page<TaskMainInfo> page = new Page<>(pageNo, pageSize);
         IPage<TaskMainInfo> taskMainInfoIPage = taskDefinitionMapper.queryDefineListPaging(page, projectCode, searchWorkflowName,
-            searchTaskName, taskType == null ? StringUtils.EMPTY : taskType.getDesc());
+                searchTaskName, taskType == null ? StringUtils.EMPTY : taskType.getDesc());
         List<TaskMainInfo> records = taskMainInfoIPage.getRecords();
         if (!records.isEmpty()) {
             Map<Long, TaskMainInfo> taskMainInfoMap = new HashMap<>();
@@ -740,11 +745,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                 String resourceIds = taskDefinition.getResourceIds();
                 if (StringUtils.isNotBlank(resourceIds)) {
                     Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
-                    PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger);
+                    PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger);
                     try {
                         permissionCheck.checkPermission();
                     } catch (Exception e) {
-                        logger.error(e.getMessage(),e);
+                        logger.error(e.getMessage(), e);
                         putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION);
                         return result;
                     }