Bladeren bron

Fix PeerTaskInstancePriorityQueue cannot contains method use taskInstanceId to check (#10491)

Wenjun Ruan 2 jaren geleden
bovenliggende
commit
0bdfa0cff9

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -1103,7 +1103,7 @@ public class WorkflowExecuteRunnable implements Runnable {
     }
 
     /**
-     * encapsulation task
+     * encapsulation task, this method will only create a new task instance, the return task instance will not contain id.
      *
      * @param processInstance process instance
      * @param taskNode        taskNode

+ 22 - 10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@@ -29,6 +29,8 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Task instances priority queue implementation
  * All the task instances are in the same process instance.
@@ -43,7 +45,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      * queue
      */
     private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
-    private final Set<Integer> taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>());
+    private final Set<String> taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>());
 
     /**
      * put task instance to priority queue
@@ -53,8 +55,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      */
     @Override
     public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
+        Preconditions.checkNotNull(taskInstance);
         queue.add(taskInstance);
-        taskInstanceIdSet.add(taskInstance.getId());
+        taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance));
     }
 
     /**
@@ -67,7 +70,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
     public TaskInstance take() throws TaskPriorityQueueException {
         TaskInstance taskInstance = queue.poll();
         if (taskInstance != null) {
-            taskInstanceIdSet.remove(taskInstance.getId());
+            taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
         }
         return taskInstance;
     }
@@ -114,7 +117,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      */
     public void clear() {
         queue.clear();
-        taskInstanceIdSet.clear();
+        taskInstanceIdentifySet.clear();
     }
 
     /**
@@ -124,11 +127,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      * @return true is contains
      */
     public boolean contains(TaskInstance taskInstance) {
-        return this.contains(taskInstance.getId());
-    }
-
-    public boolean contains(int taskInstanceId) {
-        return taskInstanceIdSet.contains(taskInstanceId);
+        Preconditions.checkNotNull(taskInstance);
+        return taskInstanceIdentifySet.contains(getTaskInstanceIdentify(taskInstance));
     }
 
     /**
@@ -138,6 +138,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      * @return true if remove success
      */
     public boolean remove(TaskInstance taskInstance) {
+        Preconditions.checkNotNull(taskInstance);
+        taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
         return queue.remove(taskInstance);
     }
 
@@ -150,10 +152,20 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
         return queue.iterator();
     }
 
+    // since the task instance will not contain taskInstanceId until insert into database
+    // So we use processInstanceId + taskCode + version to identify a taskInstance.
+    private String getTaskInstanceIdentify(TaskInstance taskInstance) {
+        return String.join(
+                String.valueOf(taskInstance.getProcessInstanceId()),
+                String.valueOf(taskInstance.getTaskCode()),
+                String.valueOf(taskInstance.getTaskDefinitionVersion())
+                , "-");
+    }
+
     /**
      * TaskInfoComparator
      */
-    private class TaskInfoComparator implements Comparator<TaskInstance> {
+    private static class TaskInfoComparator implements Comparator<TaskInstance> {
 
         /**
          * compare o1 o2

+ 6 - 0
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@@ -36,6 +36,8 @@ public class PeerTaskInstancePriorityQueueTest {
         queue.put(taskInstanceHigPriority);
         queue.put(taskInstanceMediumPriority);
         Assert.assertEquals(2, queue.size());
+        Assert.assertTrue(queue.contains(taskInstanceHigPriority));
+        Assert.assertTrue(queue.contains(taskInstanceMediumPriority));
     }
 
     @Test
@@ -108,6 +110,9 @@ public class PeerTaskInstancePriorityQueueTest {
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         queue.put(taskInstanceMediumPriority);
         Assert.assertTrue(queue.contains(taskInstanceMediumPriority));
+        TaskInstance taskInstance2 = createTaskInstance("medium2", Priority.MEDIUM, 1);
+        taskInstance2.setProcessInstanceId(2);
+        Assert.assertFalse(queue.contains(taskInstance2));
     }
 
     @Test
@@ -118,6 +123,7 @@ public class PeerTaskInstancePriorityQueueTest {
         int peekBeforeLength = queue.size();
         queue.remove(taskInstanceMediumPriority);
         Assert.assertNotEquals(peekBeforeLength, queue.size());
+        Assert.assertFalse(queue.contains(taskInstanceMediumPriority));
     }
 
     /**