Browse Source

[BUG FIX]fix bug: Restart the worker service again, the previously submitted successful tasks are not executed bug (#2800)

* feature: add number configuration for master dispatch tasks

* fix bug(#2762) the master would be blocked when worker group not exists

* fix bug(#2762) the master would be blocked when worker group not exists

* fix ut

* fix ut

* fix bug(2781): cannot pause work flow when task state is "submit success"

* fix code smell

* add mysql other param blank judge

* test

* update comments

* update comments

* add ut

* fix bug: Restart the worker service again, the previously submitted successful tasks are not executed

* update comments

* add sleep

Co-authored-by: baoliang <baoliang@analysys.com.cn>
bao liang 4 years ago
parent
commit
3e297bfd2e

+ 7 - 2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.consumer;
 
 import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UdfType;
@@ -107,6 +108,10 @@ public class TaskPriorityQueueConsumer extends Thread{
                 int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
                 failedDispatchTasks.clear();
                 for(int i = 0; i < fetchTaskNum; i++){
+                    if(taskPriorityQueue.size() <= 0){
+                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                        continue;
+                    }
                     // if not task , blocking here
                     String taskPriorityInfo = taskPriorityQueue.take();
                     TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
@@ -115,8 +120,8 @@ public class TaskPriorityQueueConsumer extends Thread{
                         failedDispatchTasks.add(taskPriorityInfo);
                     }
                 }
-                for(String taskPriorityInfo: failedDispatchTasks){
-                    taskPriorityQueue.put(taskPriorityInfo);
+                for(String dispatchFailedTask : failedDispatchTasks){
+                    taskPriorityQueue.put(dispatchFailedTask);
                 }
             }catch (Exception e){
                 logger.error("dispatcher task error",e);