Browse Source

[Fix-8616][WorkFlowLineage] work flow lineage search result missing data (#8684)

* fix bug_8616

* remove meaningless query column

* fix code smell

* fix remaining problems
xiangzihao 3 years ago
parent
commit
2ab8c1ca7d

+ 0 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@@ -216,7 +216,6 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
         pageInfo.setTotalList(list);
 
         result.put(Constants.DATA_LIST, pageInfo);
-        logger.info("select result:{}", taskGroupPaging);
         putMsg(result, Status.SUCCESS);
         return result;
     }

+ 48 - 30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java

@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.model.DependentItem;
 import org.apache.dolphinscheduler.common.model.DependentTaskModel;
 import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -36,8 +37,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.curator.shaded.com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -80,19 +79,18 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
     }
 
     @Override
-    public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long workFlowCode) {
+    public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long sourceWorkFlowCode) {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByCode(projectCode);
         if (project == null) {
             putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
             return result;
         }
-        Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
+        List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
         Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
-        Set<Long> sourceWorkFlowCodes = Sets.newHashSet(workFlowCode);
-        recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes);
+        recursiveWorkFlow(projectCode, sourceWorkFlowCode, workFlowLineages, workFlowRelations);
         Map<String, Object> workFlowLists = new HashMap<>();
-        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values());
+        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages);
         workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
         result.put(Constants.DATA_LIST, workFlowLists);
         putMsg(result, Status.SUCCESS);
@@ -100,31 +98,51 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
     }
 
     private void recursiveWorkFlow(long projectCode,
-                                   Map<Long, WorkFlowLineage> workFlowLineagesMap,
-                                   Set<WorkFlowRelation> workFlowRelations,
-                                   Set<Long> sourceWorkFlowCodes) {
-        for (Long workFlowCode : sourceWorkFlowCodes) {
-            WorkFlowLineage workFlowLineage = workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
-            workFlowLineagesMap.put(workFlowCode, workFlowLineage);
-            List<ProcessLineage> processLineages = workFlowLineageMapper.queryProcessLineageByCode(projectCode, workFlowCode);
-            List<TaskDefinition> taskDefinitionList = new ArrayList<>();
-            for (ProcessLineage processLineage : processLineages) {
-                if (processLineage.getPreTaskCode() > 0) {
-                    taskDefinitionList.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion()));
-                }
-                if (processLineage.getPostTaskCode() > 0) {
-                    taskDefinitionList.add(new TaskDefinition(processLineage.getPostTaskCode(), processLineage.getPostTaskVersion()));
+                                   long sourceWorkFlowCode,
+                                   List<WorkFlowLineage> workFlowLineages,
+                                   Set<WorkFlowRelation> workFlowRelations) {
+        workFlowLineages.add(workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode,sourceWorkFlowCode));
+
+        List<WorkFlowLineage> downStreamWorkFlowLineages =
+                workFlowLineageMapper.queryDownstreamLineageByProcessDefinitionCode(sourceWorkFlowCode, "DEPENDENT");
+        workFlowLineages.addAll(downStreamWorkFlowLineages);
+        downStreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations.add(new WorkFlowRelation(sourceWorkFlowCode, workFlowLineage.getWorkFlowCode())));
+
+        List<WorkFlowLineage> upstreamWorkFlowLineages = new ArrayList<>();
+        getUpstreamLineages(sourceWorkFlowCode, upstreamWorkFlowLineages);
+        workFlowLineages.addAll(upstreamWorkFlowLineages);
+        upstreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations.add(new WorkFlowRelation(workFlowLineage.getWorkFlowCode(), sourceWorkFlowCode)));
+    }
+
+    private void getUpstreamLineages(long sourceWorkFlowCode,
+                                     List<WorkFlowLineage> upstreamWorkFlowLineages) {
+        List<DependentProcessDefinition> workFlowDependentDefinitionList =
+                workFlowLineageMapper.queryUpstreamDependentParamsByProcessDefinitionCode(sourceWorkFlowCode, "DEPENDENT");
+
+        List<Long> upstreamProcessDefinitionCodes = new ArrayList<>();
+
+        getProcessDefinitionCodeByDependentDefinitionList(workFlowDependentDefinitionList,
+                upstreamProcessDefinitionCodes);
+
+        if (!upstreamProcessDefinitionCodes.isEmpty()) {
+            upstreamWorkFlowLineages.addAll(
+                    workFlowLineageMapper.queryWorkFlowLineageByProcessDefinitionCodes(upstreamProcessDefinitionCodes));
+        }
+    }
+
+    /**
+     * get dependent process definition code by dependent process definition list
+     */
+    private void getProcessDefinitionCodeByDependentDefinitionList(List<DependentProcessDefinition> dependentDefinitionList,
+                                                                   List<Long> processDefinitionCodes) {
+        for (DependentProcessDefinition dependentProcessDefinition : dependentDefinitionList) {
+            for (DependentTaskModel dependentTaskModel : dependentProcessDefinition.getDependentParameters().getDependTaskList()) {
+                for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) {
+                    if (!processDefinitionCodes.contains(dependentItem.getDefinitionCode())) {
+                        processDefinitionCodes.add(dependentItem.getDefinitionCode());
+                    }
                 }
             }
-            sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode, workFlowCode, taskDefinitionList);
-            if (sourceWorkFlowCodes.isEmpty()) {
-                workFlowRelations.add(new WorkFlowRelation(0L, workFlowCode));
-                return;
-            } else {
-                workFlowLineagesMap.get(workFlowCode).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA));
-                sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlowCode)));
-                recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes);
-            }
         }
     }
 

+ 26 - 1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java

@@ -45,6 +45,14 @@ public interface WorkFlowLineageMapper {
      */
     WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long projectCode, @Param("workFlowCode") long workFlowCode);
 
+    /**
+     * queryWorkFlowLineageByProcessDefinitionCodes
+     *
+     * @param workFlowCodes workFlowCodes
+     * @return WorkFlowLineage
+     */
+    List<WorkFlowLineage> queryWorkFlowLineageByProcessDefinitionCodes(@Param("workFlowCodes") List<Long> workFlowCodes);
+
     /**
      * queryWorkFlowLineageByCode
      *
@@ -71,11 +79,28 @@ public interface WorkFlowLineageMapper {
     List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long projectCode,
                                                    @Param("processDefinitionCode") long processDefinitionCode);
 
-
     /**
      * query process definition by name
      *
      * @return dependent process definition
      */
     List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long code);
+
+    /**
+     * query downstream work flow lineage by process definition code
+     *
+     * @return dependent process definition
+     */
+    List<WorkFlowLineage> queryDownstreamLineageByProcessDefinitionCode(@Param("code") long code,
+                                                                        @Param("taskType") String taskType);
+
+
+    /**
+     * query upstream work flow dependent task params by process definition code
+     *
+     * @return task_params
+     */
+    List<DependentProcessDefinition> queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code,
+                                                                                         @Param("taskType") String taskType);
+
 }

+ 56 - 0
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@@ -42,6 +42,25 @@
         where tepd.project_code = #{projectCode} and tepd.code = #{workFlowCode}
     </select>
 
+    <select id="queryWorkFlowLineageByProcessDefinitionCodes" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
+        select tepd.code as work_flow_code,
+               tepd.name as work_flow_name,
+               tepd.release_state as work_flow_publish_status,
+               tes.start_time as schedule_start_time,
+               tes.end_time as schedule_end_time,
+               tes.crontab as crontab,
+               tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where 1=1
+        <if test="workFlowCodes != null and workFlowCodes.size() != 0 ">
+            and tepd.code in
+            <foreach collection="workFlowCodes" item="code" index="index" open="(" close=")" separator=",">
+                #{code}
+            </foreach>
+        </if>
+    </select>
+
     <select id="queryWorkFlowLineageByLineage" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
         select tepd.code as work_flow_code,tepd.name as work_flow_name,
                '' as source_work_flow_code,
@@ -89,6 +108,43 @@
                 and process_definition_code = #{processDefinitionCode}
     </select>
 
+    <select id="queryUpstreamDependentParamsByProcessDefinitionCode"
+            resultType="org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition">
+        SELECT
+            DISTINCT c.task_params
+        FROM
+        t_ds_process_definition a
+        JOIN t_ds_process_task_relation b ON a.code = b.process_definition_code AND a.version = b.process_definition_version AND a.project_code = b.project_code
+        JOIN t_ds_task_definition c ON c.code = b.pre_task_code and c.version = b.pre_task_version
+        WHERE 1=1
+        AND a.code = #{code}
+        AND c.task_type = #{taskType}
+        ;
+    </select>
+
+    <select id="queryDownstreamLineageByProcessDefinitionCode"
+            resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
+        SELECT
+            c.code AS work_flow_code
+            ,c.name AS work_flow_name
+            ,c.release_state AS work_flow_publish_status
+            ,d.start_time AS schedule_start_time
+            ,d.end_time AS schedule_end_time
+            ,d.crontab AS crontab
+            ,d.release_state AS schedule_publish_status
+            ,'' AS source_work_flow_code
+        FROM
+        t_ds_task_definition a
+        JOIN t_ds_process_task_relation b ON a.code	= b.pre_task_code and a.version = b.pre_task_version
+        JOIN t_ds_process_definition c ON c.code = b.process_definition_code AND c.version = b.process_definition_version AND c.project_code = b.project_code
+        LEFT JOIN t_ds_schedules d ON d.process_definition_code = c.code
+        WHERE 1=1
+        <if test="code != null and code != ''">
+            AND a.task_params LIKE concat('%', #{code}, '%')
+        </if>
+        AND a.task_type = #{taskType}
+    </select>
+
     <select id="queryDependentProcessDefinitionByProcessDefinitionCode" resultType="DependentProcessDefinition">
         SELECT
         c.code AS process_definition_code