Browse Source

fix issue:1477 some tasks would be running all the time when db delayed(#1477) (#1501)

* fix issue:1477 some tasks would be running all the time when db delayed

* fix issue:1477 some tasks would be running all the time when db delayed

* fix issue:1477 some tasks would be running all the time when db delayed
bao liang 5 years ago
parent
commit
21cb38d1dd

+ 1 - 1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@@ -439,7 +439,7 @@ public final class Constants {
     /**
      * default master commit retry interval
      */
-    public static final int defaultMasterCommitRetryInterval = 100;
+    public static final int defaultMasterCommitRetryInterval = 3000;
 
     /**
      * time unit secong to minutes

+ 1 - 1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java

@@ -45,7 +45,7 @@ public interface ITaskQueue {
      * @param key  queue name
      * @param value
      */
-    void add(String key, String value);
+    boolean add(String key, String value);
 
     /**
      * an element pops out of the queue

+ 3 - 1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java

@@ -118,14 +118,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
      * @param value    ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
      */
     @Override
-    public void add(String key, String value) {
+    public boolean add(String key, String value){
         try {
             String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
             String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
 
             logger.info("add task : {} to tasks queue , result success",result);
+            return true;
         } catch (Exception e) {
             logger.error("add task to tasks queue exception",e);
+            return false;
         }
 
     }

+ 33 - 25
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@@ -758,7 +758,7 @@ public class ProcessDao {
     }
 
     /**
-     * submit task to mysql and task queue
+     * submit task to db
      * submit sub process to command
      * @param taskInstance taskInstance
      * @param processInstance processInstance
@@ -769,21 +769,18 @@ public class ProcessDao {
         logger.info("start submit task : {}, instance id:{}, state: {}, ",
                 taskInstance.getName(), processInstance.getId(), processInstance.getState() );
         processInstance = this.findProcessInstanceDetailById(processInstance.getId());
-        //submit to mysql
-        TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
-        if(task.isSubProcess() && !task.getState().typeIsFinished()){
-            ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
-
-            TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
-            Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
-            Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
-            createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
-        }else if(!task.getState().typeIsFinished()){
-            //submit to task queue
-            task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
-            submitTaskToQueue(task);
-        }
-        logger.info("submit task :{} state:{} complete, instance id:{} state: {}  ",
+        //submit to db
+        TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
+        if(task == null){
+            logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
+                    taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
+            return task;
+        }
+        if(!task.getState().typeIsFinished()){
+            createSubWorkProcessCommand(processInstance, task);
+        }
+
+        logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {}  ",
                 taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
         return task;
     }
@@ -845,13 +842,18 @@ public class ProcessDao {
     /**
      * create sub work process command
      * @param parentProcessInstance parentProcessInstance
-     * @param instanceMap instanceMap
-     * @param childDefineId instanceMap
      * @param task task
      */
     private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
-                                             ProcessInstanceMap instanceMap,
-                                             Integer childDefineId, TaskInstance task){
+                                             TaskInstance task){
+        if(!task.isSubProcess()){
+            return;
+        }
+        ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task);
+        TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
+        Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
+        Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
+
         ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
 
         CommandType fatherType = parentProcessInstance.getCommandType();
@@ -921,7 +923,7 @@ public class ProcessDao {
      * @param processInstance processInstance
      * @return task instance
      */
-    public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){
+    public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance){
         ExecutionStatus processInstanceState = processInstance.getState();
 
         if(taskInstance.getState().typeIsFailure()){
@@ -949,7 +951,10 @@ public class ProcessDao {
         taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
         taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
         taskInstance.setSubmitTime(new Date());
-        saveTaskInstance(taskInstance);
+        boolean saveResult = saveTaskInstance(taskInstance);
+        if(!saveResult){
+            return null;
+        }
         return taskInstance;
     }
 
@@ -961,6 +966,10 @@ public class ProcessDao {
     public Boolean submitTaskToQueue(TaskInstance taskInstance) {
 
         try{
+            if(taskInstance.getState().typeIsFinished()){
+                logger.info(String.format("submit to task queue, but task [%s] state [%s] is already  finished. ", taskInstance.getName(), taskInstance.getState().toString()));
+                return true;
+            }
             // task cannot submit when running
             if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
                 logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
@@ -971,14 +980,13 @@ public class ProcessDao {
                 return true;
             }
             logger.info("task ready to queue: {}" , taskInstance);
-            taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
+            boolean insertQueueResult = taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
             logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
-            return true;
+            return insertQueueResult;
         }catch (Exception e){
             logger.error("submit task to queue Exception: ", e);
             logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
             return false;
-
         }
     }
 

+ 22 - 6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@@ -114,21 +114,37 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
 
         int retryTimes = 1;
-
-        while (retryTimes <= commitRetryTimes){
+        boolean taskDBFlag = false;
+        boolean taskQueueFlag = false;
+        TaskInstance task = null;
+        while (true){
             try {
-                TaskInstance task = processDao.submitTask(taskInstance, processInstance);
-                if(task != null){
+                if(!taskDBFlag){
+                    // submit task to db
+                    task = processDao.submitTask(taskInstance, processInstance);
+                    if(task != null && task.getId() != 0){
+                        taskDBFlag = true;
+                    }
+                }
+                if(taskDBFlag && !taskQueueFlag){
+                    // submit task to queue
+                    taskQueueFlag = processDao.submitTaskToQueue(task);
+                }
+                if(taskDBFlag && taskQueueFlag){
                     return task;
                 }
-                logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);
+                if(!taskDBFlag){
+                    logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes);
+                }else if(!taskQueueFlag){
+                    logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes);
+
+                }
                 Thread.sleep(commitRetryInterval);
             } catch (Exception e) {
                 logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
             }
             retryTimes += 1;
         }
-        return null;
     }
 
     /**

+ 2 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@@ -91,6 +91,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
     public Boolean waitTaskQuit(){
         // query new state
         taskInstance = processDao.findTaskInstanceById(taskInstance.getId());
+        logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
+                this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
         // task time out
         Boolean checkTimeout = false;
         TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter();