Selaa lähdekoodia

[Fix-6139][API] fix workFlowLineage relation (#6217)

* fix bug of view-tree api

* fix bug of view-tree api

* fix ut

* fix ut

* fix master buildFlowDag error

* fix workFlowLineage relation

* fix ut

* fix ut

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
JinyLeeChina 3 vuotta sitten
vanhempi
commit
07d6542feb

+ 100 - 43
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java

@@ -20,20 +20,32 @@ package org.apache.dolphinscheduler.api.service.impl;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
 import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.curator.shaded.com.google.common.collect.Sets;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -50,6 +62,9 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
     @Autowired
     private ProjectMapper projectMapper;
 
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
     @Override
     public Map<String, Object> queryWorkFlowLineageByName(long projectCode, String workFlowName) {
         Map<String, Object> result = new HashMap<>();
@@ -72,12 +87,47 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
             putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
             return result;
         }
-        WorkFlowLineage workFlowLineage = workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
-        result.put(Constants.DATA_LIST, workFlowLineage);
+        Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
+        Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
+        Set<Long> sourceWorkFlowCodes = Sets.newHashSet(workFlowCode);
+        recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes);
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values());
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
+        result.put(Constants.DATA_LIST, workFlowLists);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
+    private void recursiveWorkFlow(long projectCode,
+                                   Map<Long, WorkFlowLineage> workFlowLineagesMap,
+                                   Set<WorkFlowRelation> workFlowRelations,
+                                   Set<Long> sourceWorkFlowCodes) {
+        for (Long workFlowCode : sourceWorkFlowCodes) {
+            WorkFlowLineage workFlowLineage = workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
+            workFlowLineagesMap.put(workFlowCode, workFlowLineage);
+            List<ProcessLineage> processLineages = workFlowLineageMapper.queryProcessLineageByCode(projectCode, workFlowCode);
+            List<TaskDefinition> taskDefinitionList = new ArrayList<>();
+            for (ProcessLineage processLineage : processLineages) {
+                if (processLineage.getPreTaskCode() > 0) {
+                    taskDefinitionList.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion()));
+                }
+                if (processLineage.getPostTaskCode() > 0) {
+                    taskDefinitionList.add(new TaskDefinition(processLineage.getPostTaskCode(), processLineage.getPostTaskVersion()));
+                }
+            }
+            sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode, workFlowCode, taskDefinitionList);
+            if (sourceWorkFlowCodes.isEmpty()) {
+                workFlowRelations.add(new WorkFlowRelation(0L, workFlowCode));
+                return;
+            } else {
+                workFlowLineagesMap.get(workFlowCode).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA));
+                sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlowCode)));
+                recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes);
+            }
+        }
+    }
+
     @Override
     public Map<String, Object> queryWorkFlowLineage(long projectCode) {
         Map<String, Object> result = new HashMap<>();
@@ -87,58 +137,65 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
             return result;
         }
         List<ProcessLineage> processLineages = workFlowLineageMapper.queryProcessLineage(projectCode);
-
-        Map<Long, WorkFlowLineage> workFlowLineages = new HashMap<>();
+        Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
         Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
-
-        for (ProcessLineage processLineage : processLineages) {
-            getRelation(workFlowLineages, workFlowRelations, processLineage);
+        if (!processLineages.isEmpty()) {
+            List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages);
+            workFlowLineagesMap = workFlowLineages.stream().collect(Collectors.toMap(WorkFlowLineage::getWorkFlowCode, workFlowLineage -> workFlowLineage));
+            Map<Long, List<TaskDefinition>> workFlowMap = new HashMap<>();
+            for (ProcessLineage processLineage : processLineages) {
+                workFlowMap.compute(processLineage.getProcessDefinitionCode(), (k, v) -> {
+                    if (v == null) {
+                        v = new ArrayList<>();
+                    }
+                    if (processLineage.getPreTaskCode() > 0) {
+                        v.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion()));
+                    }
+                    if (processLineage.getPostTaskCode() > 0) {
+                        v.add(new TaskDefinition(processLineage.getPostTaskCode(), processLineage.getPostTaskVersion()));
+                    }
+                    return v;
+                });
+            }
+            for (Entry<Long, List<TaskDefinition>> workFlow : workFlowMap.entrySet()) {
+                Set<Long> sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode, workFlow.getKey(), workFlow.getValue());
+                if (sourceWorkFlowCodes.isEmpty()) {
+                    workFlowRelations.add(new WorkFlowRelation(0L, workFlow.getKey()));
+                } else {
+                    workFlowLineagesMap.get(workFlow.getKey()).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA));
+                    sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlow.getKey())));
+                }
+            }
         }
-
         Map<String, Object> workFlowLists = new HashMap<>();
-        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values());
+        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values());
         workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
         result.put(Constants.DATA_LIST, workFlowLists);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
-    private void getRelation(Map<Long, WorkFlowLineage> workFlowLineageMap,
-                             Set<WorkFlowRelation> workFlowRelations,
-                             ProcessLineage processLineage) {
-        List<ProcessLineage> relations = workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
-            processLineage.getProcessDefinitionCode(), processLineage.getPostTaskCode(), processLineage.getPostTaskVersion());
-        if (!relations.isEmpty()) {
-            Set<Long> preWorkFlowCodes = new HashSet<>();
-            List<ProcessLineage> preRelations = workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
-                processLineage.getProcessDefinitionCode(), processLineage.getPreTaskCode(), processLineage.getPreTaskVersion());
-            for (ProcessLineage preRelation : preRelations) {
-                preWorkFlowCodes.add(preRelation.getProcessDefinitionCode());
-            }
-            ProcessLineage postRelation = relations.get(0);
-            WorkFlowLineage post = workFlowLineageMapper.queryWorkFlowLineageByCode(postRelation.getProjectCode(), postRelation.getProcessDefinitionCode());
-            preWorkFlowCodes.remove(post.getWorkFlowCode());
-            if (!workFlowLineageMap.containsKey(post.getWorkFlowCode())) {
-                post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ","));
-                workFlowLineageMap.put(post.getWorkFlowCode(), post);
-            } else {
-                WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowCode());
-                String sourceWorkFlowCode = workFlowLineage.getSourceWorkFlowCode();
-                if (StringUtils.isBlank(sourceWorkFlowCode)) {
-                    post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ","));
-                } else {
-                    if (!preWorkFlowCodes.isEmpty()) {
-                        workFlowLineage.setSourceWorkFlowCode(sourceWorkFlowCode + "," + StringUtils.join(preWorkFlowCodes, ","));
+    private Set<Long> querySourceWorkFlowCodes(long projectCode, long workFlowCode, List<TaskDefinition> taskDefinitionList) {
+        Set<Long> sourceWorkFlowCodes = new HashSet<>();
+        List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList);
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+            if (taskDefinitionLog.getProjectCode() == projectCode) {
+                if (taskDefinitionLog.getTaskType().equals(TaskType.DEPENDENT.getDesc())) {
+                    DependentParameters dependentParameters = JSONUtils.parseObject(taskDefinitionLog.getDependence(), DependentParameters.class);
+                    if (dependentParameters != null) {
+                        List<DependentTaskModel> dependTaskList = dependentParameters.getDependTaskList();
+                        for (DependentTaskModel taskModel : dependTaskList) {
+                            List<DependentItem> dependItemList = taskModel.getDependItemList();
+                            for (DependentItem dependentItem : dependItemList) {
+                                if (dependentItem.getProjectCode() == projectCode && dependentItem.getDefinitionCode() != workFlowCode) {
+                                    sourceWorkFlowCodes.add(dependentItem.getDefinitionCode());
+                                }
+                            }
+                        }
                     }
                 }
             }
-            if (preWorkFlowCodes.isEmpty()) {
-                workFlowRelations.add(new WorkFlowRelation(0L, post.getWorkFlowCode()));
-            } else {
-                for (long workFlowCode : preWorkFlowCodes) {
-                    workFlowRelations.add(new WorkFlowRelation(workFlowCode, post.getWorkFlowCode()));
-                }
-            }
         }
+        return sourceWorkFlowCodes;
     }
 }

+ 10 - 7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java

@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
 
 import java.util.ArrayList;
@@ -57,6 +58,9 @@ public class WorkFlowLineageServiceTest {
     @Mock
     private ProjectMapper projectMapper;
 
+    @Mock
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
     /**
      * get mock Project
      *
@@ -97,23 +101,22 @@ public class WorkFlowLineageServiceTest {
         processLineage.setProcessDefinitionVersion(1);
         processLineage.setProjectCode(1111L);
         processLineages.add(processLineage);
-
         WorkFlowLineage workFlowLineage = new WorkFlowLineage();
         workFlowLineage.setSourceWorkFlowCode("");
+        workFlowLineage.setWorkFlowCode(1111L);
+        List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
+        workFlowLineages.add(workFlowLineage);
 
         when(projectMapper.queryByCode(1L)).thenReturn(project);
         when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages);
-        when(workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(), processLineage.getProcessDefinitionCode(),
-            processLineage.getPostTaskCode(), processLineage.getPreTaskVersion())).thenReturn(processLineages);
-        when(workFlowLineageMapper.queryWorkFlowLineageByCode(processLineage.getProjectCode(), processLineage.getProcessDefinitionCode()))
-            .thenReturn(workFlowLineage);
+        when(workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages)).thenReturn(workFlowLineages);
 
         Map<String, Object> result = workFlowLineageService.queryWorkFlowLineage(1L);
 
         Map<String, Object> workFlowLists = (Map<String, Object>) result.get(Constants.DATA_LIST);
-        Collection<WorkFlowLineage> workFlowLineages = (Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
+        Collection<WorkFlowLineage> workFlowLineageList = (Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
         Set<WorkFlowRelation> workFlowRelations = (Set<WorkFlowRelation>) workFlowLists.get(Constants.WORKFLOW_RELATION_LIST);
-        Assert.assertTrue(workFlowLineages.size() > 0);
+        Assert.assertTrue(workFlowLineageList.size() > 0);
         Assert.assertTrue(workFlowRelations.size() > 0);
     }
 

+ 14 - 6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java

@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.model;
 
 import org.apache.dolphinscheduler.common.enums.DependResult;
@@ -23,16 +24,15 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
  * dependent item
  */
 public class DependentItem {
-
-    private Long definitionCode;
+    private long projectCode;
+    private long definitionCode;
     private String depTasks;
     private String cycle;
     private String dateValue;
     private DependResult dependResult;
     private ExecutionStatus status;
 
-
-    public String getKey(){
+    public String getKey() {
         return String.format("%d-%s-%s-%s",
                 getDefinitionCode(),
                 getDepTasks(),
@@ -40,11 +40,19 @@ public class DependentItem {
                 getDateValue());
     }
 
-    public Long getDefinitionCode() {
+    public long getProjectCode() {
+        return projectCode;
+    }
+
+    public void setProjectCode(long projectCode) {
+        this.projectCode = projectCode;
+    }
+
+    public long getDefinitionCode() {
         return definitionCode;
     }
 
-    public void setDefinitionCode(Long definitionCode) {
+    public void setDefinitionCode(long definitionCode) {
         this.definitionCode = definitionCode;
     }
 

+ 19 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java

@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.entity;
 
 import java.util.Objects;
@@ -45,4 +46,22 @@ public class WorkFlowRelation {
         this.sourceWorkFlowCode = sourceWorkFlowCode;
         this.targetWorkFlowCode = targetWorkFlowCode;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        WorkFlowRelation that = (WorkFlowRelation) o;
+        return sourceWorkFlowCode == that.sourceWorkFlowCode
+            && targetWorkFlowCode == that.targetWorkFlowCode;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sourceWorkFlowCode, targetWorkFlowCode);
+    }
 }

+ 12 - 6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java

@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
@@ -43,6 +44,14 @@ public interface WorkFlowLineageMapper {
      */
     WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long projectCode, @Param("workFlowCode") long workFlowCode);
 
+    /**
+     * queryWorkFlowLineageByCode
+     *
+     * @param processLineages processLineages
+     * @return WorkFlowLineage list
+     */
+    List<WorkFlowLineage> queryWorkFlowLineageByLineage(@Param("processLineages") List<ProcessLineage> processLineages);
+
     /**
      * queryProcessLineage
      *
@@ -54,13 +63,10 @@ public interface WorkFlowLineageMapper {
     /**
      * queryCodeRelation
      *
-     * @param taskCode taskCode
-     * @param taskVersion taskVersion
+     * @param projectCode projectCode
      * @param processDefinitionCode processDefinitionCode
      * @return ProcessLineage list
      */
-    List<ProcessLineage> queryCodeRelation(@Param("projectCode") long projectCode,
-                                           @Param("processDefinitionCode") long processDefinitionCode,
-                                           @Param("taskCode") long taskCode,
-                                           @Param("taskVersion") int taskVersion);
+    List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long projectCode,
+                                                   @Param("processDefinitionCode") long processDefinitionCode);
 }

+ 21 - 3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@@ -42,6 +42,26 @@
         where tepd.project_code = #{projectCode} and tepd.code = #{workFlowCode}
     </select>
 
+    <select id="queryWorkFlowLineageByLineage" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
+        select tepd.code as work_flow_code,tepd.name as work_flow_name,
+               "" as source_work_flow_code,
+                tepd.release_state as work_flow_publish_status,
+                tes.start_time as schedule_start_time,
+                tes.end_time as schedule_end_time,
+                tes.crontab as crontab,
+                tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where 1=1
+        <if test="processLineages != null and processLineages.size != 0">
+            and
+            <foreach collection="processLineages" index="index" item="item" open="(" separator=" or " close=")">
+                (tepd.project_code = #{item.projectCode}
+                and tepd.code = #{item.processDefinitionCode})
+            </foreach>
+        </if>
+    </select>
+
     <select id="queryProcessLineage" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
     select ptr.project_code,
         ptr.post_task_code,
@@ -56,7 +76,7 @@
         where pd.project_code = #{projectCode}
     </select>
 
-    <select id="queryCodeRelation" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+    <select id="queryProcessLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
         select project_code,
                post_task_code,
                post_task_version,
@@ -67,7 +87,5 @@
          from t_ds_process_task_relation
          where project_code = #{projectCode}
                 and process_definition_code = #{processDefinitionCode}
-                and post_task_code = #{taskCode}
-                and post_task_version = #{taskVersion}
     </select>
 </mapper>

+ 2 - 2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java

@@ -125,8 +125,8 @@ public class WorkFlowLineageMapperTest {
     @Test
     public void testQueryCodeRelation() {
         ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation();
-        List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryCodeRelation(processTaskRelation.getProjectCode(),
-            processTaskRelation.getProcessDefinitionCode(), processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
+        List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryProcessLineageByCode(processTaskRelation.getProjectCode(),
+            processTaskRelation.getProcessDefinitionCode());
         Assert.assertNotEquals(workFlowLineages.size(), 0);
     }
 

+ 0 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.DateInterval;
-import org.apache.dolphinscheduler.common.model.PreviousTaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;