Browse Source

add task save and binds workflow (#7772)

JinYong Li 3 years ago
parent
commit
c97170853e

+ 30 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@@ -95,6 +95,36 @@ public class TaskDefinitionController extends BaseController {
         return returnDataList(result);
     }
 
+    /**
+     * create single task definition that binds the workflow
+     *
+     * @param loginUser             login user
+     * @param projectCode           project code
+     * @param processDefinitionCode process definition code
+     * @param taskDefinitionJsonObj task definition json object
+     * @param upstreamCodes         upstream task codes, sep comma
+     * @return create result code
+     */
+    @ApiOperation(value = "saveSingle", notes = "CREATE_SINGLE_TASK_DEFINITION_NOTES")
+    @ApiImplicitParams({
+        @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
+        @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "processDefinitionCode"),
+        @ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String"),
+        @ApiImplicitParam(name = "upstreamCodes", value = "UPSTREAM_CODES", required = false, type = "String")
+    })
+    @PostMapping("/save-single")
+    @ResponseStatus(HttpStatus.CREATED)
+    @ApiException(CREATE_TASK_DEFINITION_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result createTaskBindsWorkFlow(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                          @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                          @RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode,
+                                          @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
+                                          @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) {
+        Map<String, Object> result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, processDefinitionCode, taskDefinitionJsonObj, upstreamCodes);
+        return returnDataList(result);
+    }
+
     /**
      * update task definition
      *

+ 16 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@@ -40,6 +40,22 @@ public interface TaskDefinitionService {
                                              long projectCode,
                                              String taskDefinitionJson);
 
+    /**
+     * create single task definition that binds the workflow
+     *
+     * @param loginUser             login user
+     * @param projectCode           project code
+     * @param processDefinitionCode process definition code
+     * @param taskDefinitionJsonObj task definition json object
+     * @param upstreamCodes         upstream task codes, sep comma
+     * @return create result code
+     */
+    Map<String, Object> createTaskBindsWorkFlow(User loginUser,
+                                                long projectCode,
+                                                long processDefinitionCode,
+                                                String taskDefinitionJsonObj,
+                                                String upstreamCodes);
+
     /**
      * query task definition
      *

+ 91 - 3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
@@ -39,16 +40,16 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.service.permission.PermissionCheck;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -101,7 +102,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     private ProcessService processService;
 
     @Autowired
-    private UserMapper userMapper;
+    private ProcessDefinitionMapper processDefinitionMapper;
 
     /**
      * create task definition
@@ -148,6 +149,93 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         return result;
     }
 
+    /**
+     * create single task definition that binds the workflow
+     *
+     * @param loginUser             login user
+     * @param projectCode           project code
+     * @param processDefinitionCode process definition code
+     * @param taskDefinitionJsonObj task definition json object
+     * @param upstreamCodes         upstream task codes, sep comma
+     * @return create result code
+     */
+    @Transactional(rollbackFor = RuntimeException.class)
+    @Override
+    public Map<String, Object> createTaskBindsWorkFlow(User loginUser,
+                                                       long projectCode,
+                                                       long processDefinitionCode,
+                                                       String taskDefinitionJsonObj,
+                                                       String upstreamCodes) {
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
+            return result;
+        }
+        TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
+        if (taskDefinition == null) {
+            logger.error("taskDefinitionJsonObj is not valid json");
+            putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
+            return result;
+        }
+        if (!CheckUtils.checkTaskDefinitionParameters(taskDefinition)) {
+            logger.error("task definition {} parameter invalid", taskDefinition.getName());
+            putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
+            return result;
+        }
+        long taskCode = taskDefinition.getCode();
+        if (taskCode == 0) {
+            try {
+                taskCode = CodeGenerateUtils.getInstance().genCode();
+                taskDefinition.setCode(taskCode);
+            } catch (CodeGenerateException e) {
+                logger.error("Task code get error, ", e);
+                putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, taskDefinitionJsonObj);
+                return result;
+            }
+        }
+        if (StringUtils.isNotBlank(upstreamCodes)) {
+            Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+            List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
+            Set<Long> queryUpStreamTaskCodes = upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet());
+            // upstreamTaskCodes - queryUpStreamTaskCodes
+            Set<Long> diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)).collect(Collectors.toSet());
+            if (!diffCode.isEmpty()) {
+                putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(diffCode, Constants.COMMA));
+                return result;
+            }
+            List<ProcessTaskRelationLog> processTaskRelationLogList = Lists.newArrayList();
+            for (TaskDefinition upstreamTask : upstreamTaskDefinitionList) {
+                ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
+                processTaskRelationLog.setPreTaskCode(upstreamTask.getCode());
+                processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion());
+                processTaskRelationLog.setPostTaskCode(taskCode);
+                processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
+                processTaskRelationLogList.add(processTaskRelationLog);
+            }
+            int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(),
+                processTaskRelationLogList, null);
+            if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+                putMsg(result, Status.SUCCESS);
+                result.put(Constants.DATA_LIST, processDefinition);
+            } else {
+                putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+                throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            }
+        }
+        int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition));
+        if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+            putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+            throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
+        }
+        return result;
+    }
+
     /**
      * query task definition
      *

+ 9 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.ibatis.annotations.MapKey;
 import org.apache.ibatis.annotations.Param;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -117,4 +118,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
                                               @Param("searchWorkflowName") String searchWorkflowName,
                                               @Param("searchTaskName") String searchTaskName,
                                               @Param("taskType") String taskType);
+
+    /**
+     * query task definition by code list
+     *
+     * @param codes taskDefinitionCode list
+     * @return task definition list
+     */
+    List<TaskDefinition> queryByCodeList(@Param("codes") Collection<Long> codes);
 }

+ 12 - 0
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@@ -106,4 +106,16 @@
         </if>
         order by td.update_time desc
     </select>
+    <select id="queryByCodeList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_definition
+        where 1 = 1
+        <if test="codes != null and codes.size != 0">
+            and code in
+            <foreach collection="codes" index="index" item="i" open="(" separator="," close=")">
+                #{i}
+            </foreach>
+        </if>
+    </select>
 </mapper>