Parcourir la source

#2282 fix workflow dependent bug (#2329)

* fix workflow dependent bug

* fix workflow dependent bug 2

* fix workflow dependent bug 2

Co-authored-by: sunchaohe <sunzhaohe@linklogis.com>
Co-authored-by: dailidong <dailidong66@gmail.com>
zixi0825 il y a 5 ans
Parent
commit
69e000b542

+ 1 - 1
dolphinscheduler-dao/src/main/resources/application.properties

@@ -22,7 +22,7 @@ spring.datasource.driver-class-name=org.postgresql.Driver
 spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler
 # mysql
 #spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-#spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
+#spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
 # h2
 #spring.datasource.driver-class-name=org.h2.Driver
 #spring.datasource.url=jdbc:h2:file:../sql/h2;AUTO_SERVER=TRUE

+ 77 - 18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java

@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.model.DateInterval;
 import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.utils.DependentUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -82,7 +83,7 @@ public class DependentExecute {
      * @param currentTime   current time
      * @return DependResult
      */
-    public DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
+    private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
         List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
         return calculateResultForTasks(dependentItem, dateIntervals );
     }
@@ -94,7 +95,8 @@ public class DependentExecute {
      * @return dateIntervals
      */
     private DependResult calculateResultForTasks(DependentItem dependentItem,
-                                                        List<DateInterval> dateIntervals) {
+                                                 List<DateInterval> dateIntervals) {
+
         DependResult result = DependResult.FAILED;
         for(DateInterval dateInterval : dateIntervals){
             ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
@@ -104,25 +106,35 @@ public class DependentExecute {
                        dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );
                 return DependResult.FAILED;
             }
+            // need to check workflow for updates, so get all task and check the task state
             if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
-                result = getDependResultByState(processInstance.getState());
-            }else{
-                TaskInstance taskInstance = null;
-                List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+                List<TaskNode> taskNodes =
+                        processService.getTaskNodeListByDefinitionId(dependentItem.getDefinitionId());
 
-                for(TaskInstance task : taskInstanceList){
-                    if(task.getName().equals(dependentItem.getDepTasks())){
-                        taskInstance = task;
-                        break;
+                if(taskNodes != null && taskNodes.size() > 0){
+                    List<DependResult> results = new ArrayList<>();
+                    DependResult tmpResult =  DependResult.FAILED;
+                    for(TaskNode taskNode:taskNodes){
+                        tmpResult = getDependTaskResult(taskNode.getName(),processInstance);
+                        if(DependResult.FAILED == tmpResult){
+                            break;
+                        }else{
+                            results.add(getDependTaskResult(taskNode.getName(),processInstance));
+                        }
+                    }
+
+                    if(DependResult.FAILED == tmpResult){
+                        result = DependResult.FAILED;
+                    }else if(results.contains(DependResult.WAITING)){
+                        result = DependResult.WAITING;
+                    }else{
+                        result =  DependResult.SUCCESS;
                     }
-                }
-                if(taskInstance == null){
-                    // cannot find task in the process instance
-                    // maybe because process instance is running or failed.
-                     result = getDependResultByState(processInstance.getState());
                 }else{
-                    result = getDependResultByState(taskInstance.getState());
+                    result = DependResult.FAILED;
                 }
+            }else{
+                result = getDependTaskResult(dependentItem.getDepTasks(),processInstance);
             }
             if(result != DependResult.SUCCESS){
                 break;
@@ -131,6 +143,35 @@ public class DependentExecute {
         return result;
     }
 
+    /**
+     * get depend task result
+     * @param taskName
+     * @param processInstance
+     * @return
+     */
+    private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) {
+        DependResult result = DependResult.FAILED;
+        TaskInstance taskInstance = null;
+        List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+
+        for(TaskInstance task : taskInstanceList){
+            if(task.getName().equals(taskName)){
+                taskInstance = task;
+                break;
+            }
+        }
+
+        if(taskInstance == null){
+            // cannot find task in the process instance
+            // maybe because process instance is running or failed.
+            result = getDependResultByProcessStateWhenTaskNull(processInstance.getState());
+        }else{
+            result = getDependResultByState(taskInstance.getState());
+        }
+
+        return result;
+    }
+
     /**
      * find the last one process instance that :
      * 1. manual run and finish between the interval
@@ -172,7 +213,9 @@ public class DependentExecute {
      */
     private DependResult getDependResultByState(ExecutionStatus state) {
 
-        if(state.typeIsRunning() || state == ExecutionStatus.SUBMITTED_SUCCESS || state == ExecutionStatus.WAITTING_THREAD){
+        if(state.typeIsRunning()
+                || state == ExecutionStatus.SUBMITTED_SUCCESS
+                || state == ExecutionStatus.WAITTING_THREAD){
             return DependResult.WAITING;
         }else if(state.typeIsSuccess()){
             return DependResult.SUCCESS;
@@ -181,6 +224,22 @@ public class DependentExecute {
         }
     }
 
+    /**
+     * get dependent result by task instance state when task instance is null
+     * @param state state
+     * @return DependResult
+     */
+    private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) {
+
+        if(state.typeIsRunning()
+                || state == ExecutionStatus.SUBMITTED_SUCCESS
+                || state == ExecutionStatus.WAITTING_THREAD){
+            return DependResult.WAITING;
+        }else{
+            return DependResult.FAILED;
+        }
+    }
+
     /**
      * judge depend item finished
      * @param currentTime current time
@@ -222,7 +281,7 @@ public class DependentExecute {
      * @param currentTime   current time
      * @return DependResult
      */
-    public DependResult getDependResultForItem(DependentItem item, Date currentTime){
+    private DependResult getDependResultForItem(DependentItem item, Date currentTime){
         String key = item.getKey();
         if(dependResultMap.containsKey(key)){
             return dependResultMap.get(key);

+ 5 - 4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java

@@ -82,10 +82,11 @@ public class DependentTask extends AbstractTask {
 
         this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(),
                 DependentParameters.class);
-
-        for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
-            this.dependentTaskList.add(new DependentExecute(
-                            taskModel.getDependItemList(), taskModel.getRelation()));
+        if(dependentParameters != null){
+            for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
+                this.dependentTaskList.add(new DependentExecute(
+                        taskModel.getDependItemList(), taskModel.getRelation()));
+            }
         }
 
         this.processService = SpringApplicationContext.getBean(ProcessService.class);

+ 79 - 20
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java

@@ -17,47 +17,106 @@
 package org.apache.dolphinscheduler.server.worker.task.dependent;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.model.DateInterval;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
 public class DependentTaskTest {
 
     private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class);
 
+    private ProcessService processService;
+    private ApplicationContext applicationContext;
 
-    @Test
-    public void testDependInit() throws Exception{
 
-        TaskProps taskProps = new TaskProps();
+    @Before
+    public void before() throws Exception{
+        processService = Mockito.mock(ProcessService.class);
+        Mockito.when(processService
+                .findLastRunningProcess(4,DependentDateUtils.getTodayInterval(new Date()).get(0)))
+                .thenReturn(findLastProcessInterval());
+        Mockito.when(processService
+                .getTaskNodeListByDefinitionId(4))
+                .thenReturn(getTaskNodes());
+        Mockito.when(processService
+                .findValidTaskListByProcessId(11))
+                .thenReturn(getTaskInstances());
 
-        String dependString = "{\n" +
-                "\"dependTaskList\":[\n" +
-                "    {\n" +
-                "        \"dependItemList\":[\n" +
-                "            {\n" +
-                "                    \"definitionId\": 101,\n" +
-                "                    \"depTasks\": \"ALL\",\n" +
-                "                    \"cycle\": \"day\",\n" +
-                "                    \"dateValue\": \"last1Day\"\n" +
-                "            }\n" +
-                "        ],\n" +
-                "        \"relation\": \"AND\"\n" +
-                "    }\n" +
-                "    ],\n" +
-                "\"relation\":\"OR\"\n" +
-                "}";
+        Mockito.when(processService
+                .findTaskInstanceById(252612))
+                .thenReturn(getTaskInstance());
+        applicationContext = Mockito.mock(ApplicationContext.class);
+        SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+        springApplicationContext.setApplicationContext(applicationContext);
+        Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+    }
 
+    @Test
+    public void test() throws Exception{
+
+        TaskProps taskProps = new TaskProps();
+        String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
         taskProps.setTaskInstId(252612);
         taskProps.setDependence(dependString);
+        taskProps.setTaskStartTime(new Date());
         DependentTask dependentTask = new DependentTask(taskProps, logger);
         dependentTask.init();
         dependentTask.handle();
-        Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_FAILURE );
+        Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS );
     }
 
+    private ProcessInstance findLastProcessInterval(){
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(11);
+        processInstance.setState(ExecutionStatus.SUCCESS);
+        return  processInstance;
+    }
+
+    private List<TaskNode> getTaskNodes(){
+        List<TaskNode> list = new ArrayList<>();
+        TaskNode taskNode = new TaskNode();
+        taskNode.setName("C");
+        taskNode.setType("SQL");
+        list.add(taskNode);
+        return list;
+    }
 
+    private List<TaskInstance> getTaskInstances(){
+        List<TaskInstance> list = new ArrayList<>();
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setName("C");
+        taskInstance.setState(ExecutionStatus.SUCCESS);
+        taskInstance.setDependency("1231");
+        list.add(taskInstance);
+        return list;
+    }
+
+    private TaskInstance getTaskInstance(){
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(252612);
+        taskInstance.setName("C");
+        taskInstance.setState(ExecutionStatus.SUCCESS);
+        return taskInstance;
+    }
 
 }

+ 24 - 0
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -236,6 +236,30 @@ public class ProcessService {
         return processInstanceMapper.queryDetailById(processId);
     }
 
+    /**
+     * get task node list by definitionId
+     * @param defineId
+     * @return
+     */
+    public  List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
+        ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
+        if (processDefinition == null) {
+            logger.info("process define not exists");
+            return null;
+        }
+
+        String processDefinitionJson = processDefinition.getProcessDefinitionJson();
+        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
+
+        //process data check
+        if (null == processData) {
+            logger.error("process data is null");
+            return null;
+        }
+
+        return processData.getTasks();
+    }
+
     /**
      * find process instance by id
      * @param processId processId

+ 1 - 0
pom.xml

@@ -740,6 +740,7 @@
                         <include>**/server/worker/task/datax/DataxTaskTest.java</include>
                         <include>**/server/worker/task/shell/ShellTaskTest.java</include>
                         <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
+                        <include>**/server/worker/task/dependent/DependentTaskTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         <include>**/service/zk/DefaultEnsembleProviderTest.java</include>
                         <include>**/dao/datasource/BaseDataSourceTest.java</include>