Browse Source

fix bug: task cannot submit when recovery failover (#1011)

* update english documents

* refactor zk client

* update documents

* update zkclient

* update zkclient

* update documents

* add architecture-design

* change i18n

* update i18n

* update english documents

* add architecture-design

* update english documents

* update en-US documents

* add architecture-design

* update demo site

* add mybatis plus model

* modify mybatisplus

* modify mybatisplus

* change interface by mybatisplus

* add unit test

* refactor dao interface.

* add unit test for dao...

* add unit test for dao...

* add unit test for dao...

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ScheduleMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProcessInstanceMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectUserMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/QueueMapper.xml
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProcessInstanceMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProjectUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/QueueMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ResourceUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ScheduleMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/SessionMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/TenantMapperTest.java

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ScheduleMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProcessInstanceMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectUserMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/QueueMapper.xml
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProcessInstanceMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProjectUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/QueueMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ResourceUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ScheduleMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/SessionMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/TenantMapperTest.java

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

* update some dao bugs

* update for some bugs

* update some bugs

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

* update

* update

* add multiply settings for application.yml

* add multiply settings for application.yml

* revert

* update configuration settings in task record dao...

* change application_master to application-master

* change application_master to application-master

* update application.yml to application.properties

* revert

* revert

* add properties

* add properties

* revert

* revert

* add api start up..
add alert send try catch

* update dao info level

* fix bug: task cannot submit when recovery failover

* fix bug: task cannot submit when recovery failover

* merge from dev-db

* revert
bao liang 5 years ago
parent
commit
07c4e6835e

+ 3 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java

@@ -332,6 +332,9 @@ public class DataAnalysisService {
             projectIds.add(projectId);
         }else if(loginUser.getUserType() == UserType.GENERAL_USER){
             projectIds = processDao.getProjectIdListHavePerm(loginUser.getId());
+            if(projectIds.size() ==0 ){
+                projectIds.add(0);
+            }
         }
         return projectIds.toArray(new Integer[projectIds.size()]);
     }

+ 18 - 13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@@ -49,7 +49,7 @@ public class DagHelper {
      * @param taskNodeList
      * @return
      */
-    private static List<TaskNodeRelation> generateRelationListByFlowNodes(List<TaskNode> taskNodeList) {
+    public static List<TaskNodeRelation> generateRelationListByFlowNodes(List<TaskNode> taskNodeList) {
         List<TaskNodeRelation> nodeRelationList = new ArrayList<>();
         for (TaskNode taskNode : taskNodeList) {
             String preTasks = taskNode.getPreTasks();
@@ -73,8 +73,8 @@ public class DagHelper {
      * @param taskDependType
      * @return
      */
-    private static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList, List<String> startNodeNameList,
-                                                                  List<String> recoveryNodeNameList, TaskDependType taskDependType) {
+    public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList, List<String> startNodeNameList,
+                                                                 List<String> recoveryNodeNameList, TaskDependType taskDependType) {
         List<TaskNode> destFlowNodeList = new ArrayList<>();
         List<String> startNodeList = startNodeNameList;
 
@@ -262,35 +262,41 @@ public class DagHelper {
         for(String start : startVertexs){
             TaskNode startNode = dag.getNode(start);
             if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
+                // the start can be submit if not forbidden and not in complete tasks
                 continue;
             }
+            // then submit the post nodes
             Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
-
             for(String post : postNodes){
-                if(checkForbiddenPostCanSubmit(post, dag)){
+                TaskNode postNode = dag.getNode(post);
+                if(taskNodeCanSubmit(postNode, dag, completeTaskList)){
                     tmpStartVertexs.add(post);
                 }
             }
             tmpStartVertexs.remove(start);
         }
-
         return tmpStartVertexs;
     }
 
     /**
-     *
-     * @param postNodeName
+     * the task can be submit when  all the depends nodes are forbidden or complete
+     * @param taskNode
      * @param dag
+     * @param completeTaskList
      * @return
      */
-    private static boolean checkForbiddenPostCanSubmit(String postNodeName,  DAG<String, TaskNode, TaskNodeRelation> dag){
+    public static boolean taskNodeCanSubmit(TaskNode taskNode,
+                                            DAG<String, TaskNode, TaskNodeRelation> dag,
+                                            Map<String, TaskInstance> completeTaskList) {
 
-        TaskNode postNode = dag.getNode(postNodeName);
-        List<String> dependList = postNode.getDepList();
+        List<String> dependList = taskNode.getDepList();
+        if(dependList == null){
+            return true;
+        }
 
         for(String dependNodeName : dependList){
             TaskNode dependNode = dag.getNode(dependNodeName);
-            if(!dependNode.isForbidden()){
+            if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){
                 return false;
             }
         }
@@ -298,7 +304,6 @@ public class DagHelper {
     }
 
 
-
     /***
      * generate dag graph
      * @param processDag

+ 0 - 1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@@ -50,7 +50,6 @@
                 #{i}
             </foreach>
         </if>
-        and t.flag = 1
         <if test="startTime != null and endTime != null">
             and t.start_time > #{startTime} and t.start_time <![CDATA[ <= ]]> #{endTime}
         </if>

+ 114 - 0
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@@ -0,0 +1,114 @@
+package org.apache.dolphinscheduler.dao.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DagHelperTest {
+
+
+    @Test
+    public void testTaskNodeCanSubmit() throws JsonProcessingException {
+
+
+        //1->2->3->5
+        //4->3
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+        TaskNode taskNode3 = dag.getNode("3");
+        Map<String, TaskInstance > completeTaskList = new HashMap<>();
+        completeTaskList.putIfAbsent("1", new TaskInstance());
+        Boolean canSubmit = false;
+
+        // 2/4 are forbidden submit 3
+        TaskNode node2 = dag.getNode("2");
+        node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+        TaskNode nodex = dag.getNode("4");
+        nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+        canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
+        Assert.assertEquals(canSubmit, true);
+
+        // 2forbidden, 3 cannot be submit
+        completeTaskList.putIfAbsent("2", new TaskInstance());
+        TaskNode nodey = dag.getNode("4");
+        nodey.setRunFlag("");
+        canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
+        Assert.assertEquals(canSubmit, false);
+
+        // 2/3 forbidden submit 5
+        TaskNode node3 = dag.getNode("3");
+        node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+        TaskNode node5 = dag.getNode("5");
+        canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList);
+        Assert.assertEquals(canSubmit, true);
+   }
+
+    /**
+     * 1->2->3->5
+     * 4->3
+     * @return
+     * @throws JsonProcessingException
+     */
+    private DAG<String, TaskNode, TaskNodeRelation> generateDag() throws JsonProcessingException {
+        List<TaskNode> taskNodeList = new ArrayList<>();
+        TaskNode node1 = new TaskNode();
+        node1.setId("1");
+        node1.setName("1");
+        taskNodeList.add(node1);
+
+        TaskNode node2 = new TaskNode();
+        node2.setId("2");
+        node2.setName("2");
+        List<String> dep2 = new ArrayList<>();
+        dep2.add("1");
+        node2.setDepList(dep2);
+        taskNodeList.add(node2);
+
+
+        TaskNode node4 = new TaskNode();
+        node4.setId("4");
+        node4.setName("4");
+//        node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+        taskNodeList.add(node4);
+
+        TaskNode node3 = new TaskNode();
+        node3.setId("3");
+        node3.setName("3");
+        List<String> dep3 = new ArrayList<>();
+        dep3.add("2");
+        dep3.add("4");
+        node3.setDepList(dep3);
+        taskNodeList.add(node3);
+
+        TaskNode node5 = new TaskNode();
+        node5.setId("5");
+        node5.setName("5");
+        List<String> dep5 = new ArrayList<>();
+        dep5.add("3");
+        node5.setDepList(dep5);
+        taskNodeList.add(node5);
+
+        List<String> startNodes = new ArrayList<>();
+        List<String> recoveryNodes  = new ArrayList<>();
+        List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
+                startNodes, recoveryNodes, TaskDependType.TASK_POST);
+        List<TaskNodeRelation> taskNodeRelations =DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
+        ProcessDag processDag = new ProcessDag();
+        processDag.setEdges(taskNodeRelations);
+        processDag.setNodes(destTaskNodeList);
+
+        return DagHelper.buildDagGraph(processDag);
+    }
+
+}

+ 17 - 13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java

@@ -168,19 +168,23 @@ public class AlertManager {
      * @param toleranceTaskList
      */
     public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
-        Alert alert = new Alert();
-        alert.setTitle("worker fault tolerance");
-        alert.setShowType(ShowType.TABLE);
-        String content = getWorkerToleranceContent(processInstance, toleranceTaskList);
-        alert.setContent(content);
-        alert.setAlertType(AlertType.EMAIL);
-        alert.setCreateTime(new Date());
-        alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId());
-        alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
-        alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());
-
-        alertDao.addAlert(alert);
-        logger.info("add alert to db , alert : {}", alert.toString());
+        try{
+            Alert alert = new Alert();
+            alert.setTitle("worker fault tolerance");
+            alert.setShowType(ShowType.TABLE);
+            String content = getWorkerToleranceContent(processInstance, toleranceTaskList);
+            alert.setContent(content);
+            alert.setAlertType(AlertType.EMAIL);
+            alert.setCreateTime(new Date());
+            alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId());
+            alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
+            alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());
+            alertDao.addAlert(alert);
+            logger.info("add alert to db , alert : {}", alert.toString());
+
+        }catch (Exception e){
+            logger.error("send alert failed! " + e);
+        }
 
     }