Quellcode durchsuchen

TaskGroupPriority only compare When TaskGroup is same (#15486)

Wenjun Ruan vor 1 Jahr
Ursprung
Commit
44c356db4d

+ 17 - 16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -91,7 +91,7 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
+import org.apache.dolphinscheduler.service.queue.StandByTaskInstancePriorityQueue;
 import org.apache.dolphinscheduler.service.utils.DagHelper;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
     /**
      * The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id.
      */
-    private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
+    private final StandByTaskInstancePriorityQueue standByTaskInstancePriorityQueue =
+            new StandByTaskInstancePriorityQueue();
 
     /**
      * wait to retry taskInstance map, taskCode as key, taskInstance as value
@@ -249,7 +250,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
         this.taskInstanceDao = taskInstanceDao;
         this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory;
         this.listenerEventAlertManager = listenerEventAlertManager;
-        TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
+        TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size);
     }
 
     /**
@@ -1430,7 +1431,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
         // if previous node success , post node submit
         for (TaskInstance task : taskInstances) {
 
-            if (readyToSubmitTaskQueue.contains(task)) {
+            if (standByTaskInstancePriorityQueue.contains(task)) {
                 log.warn("Task is already at submit queue, taskInstanceName: {}", task.getName());
                 continue;
             }
@@ -1665,7 +1666,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
                 return true;
             }
             if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
-                return readyToSubmitTaskQueue.size() == 0 && taskExecuteRunnableMap.size() == 0
+                return standByTaskInstancePriorityQueue.size() == 0 && taskExecuteRunnableMap.size() == 0
                         && waitToRetryTaskInstanceMap.size() == 0;
             }
         }
@@ -1688,7 +1689,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
 
         List<TaskInstance> pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE);
         if (CollectionUtils.isNotEmpty(pauseList) || workflowInstance.isBlocked() || !isComplementEnd()
-                || readyToSubmitTaskQueue.size() > 0) {
+                || standByTaskInstancePriorityQueue.size() > 0) {
             return WorkflowExecutionStatus.PAUSE;
         } else {
             return WorkflowExecutionStatus.SUCCESS;
@@ -1711,8 +1712,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
                 }
             }
         }
-        if (readyToSubmitTaskQueue.size() > 0) {
-            for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
+        if (standByTaskInstancePriorityQueue.size() > 0) {
+            for (Iterator<TaskInstance> iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
                 iter.next().setState(TaskExecutionStatus.PAUSE);
             }
         }
@@ -1773,7 +1774,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
         // success
         if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) {
             List<TaskInstance> killTasks = getCompleteTaskByState(TaskExecutionStatus.KILL);
-            if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
+            if (standByTaskInstancePriorityQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
                 // tasks currently pending submission, no retries, indicating that depend is waiting to complete
                 return WorkflowExecutionStatus.RUNNING_EXECUTION;
             } else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -1878,7 +1879,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
      * @param taskInstance task instance
      */
     public void addTaskToStandByList(TaskInstance taskInstance) {
-        if (readyToSubmitTaskQueue.contains(taskInstance)) {
+        if (standByTaskInstancePriorityQueue.contains(taskInstance)) {
             log.warn("Task already exists in ready submit queue, no need to add again, task code:{}",
                     taskInstance.getTaskCode());
             return;
@@ -1888,7 +1889,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
                 taskInstance.getId(),
                 taskInstance.getTaskCode());
         TaskMetrics.incTaskInstanceByState("submit");
-        readyToSubmitTaskQueue.put(taskInstance);
+        standByTaskInstancePriorityQueue.put(taskInstance);
     }
 
     /**
@@ -1897,7 +1898,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
      * @param taskInstance task instance
      */
     private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
-        return readyToSubmitTaskQueue.remove(taskInstance);
+        return standByTaskInstancePriorityQueue.remove(taskInstance);
     }
 
     /**
@@ -1906,7 +1907,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
      * @return Boolean whether has retry task in standby
      */
     private boolean hasRetryTaskInStandBy() {
-        for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
+        for (Iterator<TaskInstance> iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
             if (iter.next().getState().isFailure()) {
                 return true;
             }
@@ -1923,8 +1924,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
                 workflowInstance.getId(),
                 taskExecuteRunnableMap.size());
 
-        if (readyToSubmitTaskQueue.size() > 0) {
-            readyToSubmitTaskQueue.clear();
+        if (standByTaskInstancePriorityQueue.size() > 0) {
+            standByTaskInstancePriorityQueue.clear();
         }
 
         for (long taskCode : taskExecuteRunnableMap.keySet()) {
@@ -1965,7 +1966,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
     public void submitStandByTask() throws StateEventHandleException {
         ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
         TaskInstance task;
-        while ((task = readyToSubmitTaskQueue.peek()) != null) {
+        while ((task = standByTaskInstancePriorityQueue.peek()) != null) {
             // stop tasks which is retrying if forced success happens
             if (task.getId() != null && task.taskCanRetry()) {
                 TaskInstance retryTask = taskInstanceDao.queryById(task.getId());

+ 18 - 17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.service.queue;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
@@ -35,7 +34,7 @@ import com.google.common.base.Preconditions;
  * Task instances priority queue implementation
  * All the task instances are in the same process instance.
  */
-public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
+public class StandByTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
 
     /**
      * queue size
@@ -45,7 +44,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
     /**
      * queue
      */
-    private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+    private final PriorityQueue<TaskInstance> queue =
+            new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInstancePriorityComparator());
     private final Set<String> taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>());
 
     /**
@@ -163,24 +163,25 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
     }
 
     /**
-     * TaskInfoComparator
+     * This comparator is used to sort task instances in the standby queue.
+     * If the TaskInstance is in the same taskGroup, then we will sort the TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup.
+     * Otherwise, we will sort the TaskInstance by {@link TaskInstance#getTaskInstancePriority()} in the workflow.
      */
-    private static class TaskInfoComparator implements Comparator<TaskInstance> {
-
-        /**
-         * compare o1 o2
-         *
-         * @param o1 o1
-         * @param o2 o2
-         * @return compare result
-         */
+    private static class TaskInstancePriorityComparator implements Comparator<TaskInstance> {
+
         @Override
         public int compare(TaskInstance o1, TaskInstance o2) {
-            if (o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())) {
-                // larger number, higher priority
-                return Constants.OPPOSITE_VALUE * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
+            int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
+            int taskInstancePriorityInWorkflow =
+                    Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode());
+
+            if (o1.getTaskGroupId() == o2.getTaskGroupId()) {
+                // If at the same taskGroup
+                if (taskPriorityInTaskGroup != 0) {
+                    return taskPriorityInTaskGroup;
+                }
             }
-            return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
+            return taskInstancePriorityInWorkflow;
         }
     }
 }

+ 15 - 14
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@@ -26,11 +26,11 @@ import java.util.concurrent.TimeUnit;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class PeerTaskInstancePriorityQueueTest {
+public class StandByTaskInstancePriorityQueueTest {
 
     @Test
     public void put() throws TaskPriorityQueueException {
-        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         queue.put(taskInstanceHigPriority);
@@ -42,7 +42,7 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void take() throws Exception {
-        PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
         int peekBeforeLength = queue.size();
         queue.take();
         Assertions.assertTrue(queue.size() < peekBeforeLength);
@@ -50,7 +50,7 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void poll() throws Exception {
-        PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
         Assertions.assertThrows(TaskPriorityQueueException.class, () -> {
             queue.poll(1000, TimeUnit.MILLISECONDS);
         });
@@ -58,14 +58,15 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void peek() throws Exception {
-        PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
         int peekBeforeLength = queue.size();
         Assertions.assertEquals(peekBeforeLength, queue.size());
     }
 
     @Test
     public void peekTaskGroupPriority() throws Exception {
-        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
+
         TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2);
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1);
         queue.put(taskInstanceMediumPriority);
@@ -80,7 +81,7 @@ public class PeerTaskInstancePriorityQueueTest {
         queue.put(taskInstanceHigPriority);
         taskInstance = queue.peek();
         queue.clear();
-        Assertions.assertEquals(taskInstance.getName(), "medium");
+        Assertions.assertEquals("medium", taskInstance.getName());
 
         taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
         taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2);
@@ -88,7 +89,7 @@ public class PeerTaskInstancePriorityQueueTest {
         queue.put(taskInstanceHigPriority);
         taskInstance = queue.peek();
         queue.clear();
-        Assertions.assertEquals(taskInstance.getName(), "high");
+        Assertions.assertEquals("medium", taskInstance.getName());
 
         taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
         taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
@@ -96,7 +97,7 @@ public class PeerTaskInstancePriorityQueueTest {
         queue.put(taskInstanceHigPriority);
         taskInstance = queue.peek();
         queue.clear();
-        Assertions.assertEquals(taskInstance.getName(), "high");
+        Assertions.assertEquals("high", taskInstance.getName());
 
     }
 
@@ -107,7 +108,7 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void contains() throws Exception {
-        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         queue.put(taskInstanceMediumPriority);
         Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
@@ -117,8 +118,8 @@ public class PeerTaskInstancePriorityQueueTest {
     }
 
     @Test
-    public void remove() throws Exception {
-        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+    public void remove() {
+        StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         queue.put(taskInstanceMediumPriority);
         int peekBeforeLength = queue.size();
@@ -133,8 +134,8 @@ public class PeerTaskInstancePriorityQueueTest {
      * @return queue
      * @throws Exception
      */
-    private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
-        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+    private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
+        StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         taskInstanceHigPriority.setTaskGroupPriority(3);