Browse Source

[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7809)

* [Bug-7788] fix submit duplicate tasks sometimes when retry

* [Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry

Co-authored-by: caishunfeng <534328519@qq.com>
wind 3 years ago
parent
commit
a8c592bd93

+ 35 - 4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -81,6 +81,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1375,13 +1376,22 @@ public class WorkflowExecuteThread {
      * @param taskInstance task instance
      */
     private void addTaskToStandByList(TaskInstance taskInstance) {
-        logger.info("add task to stand by list: {}", taskInstance.getName());
         try {
-            if (!readyToSubmitTaskQueue.contains(taskInstance)) {
-                readyToSubmitTaskQueue.put(taskInstance);
+            if (readyToSubmitTaskQueue.contains(taskInstance)) {
+                logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
+                return;
+            }
+            // need to check if the tasks with same task code is active
+            boolean active = hadNotFailTask(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
+            if (active) {
+                logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode());
+                return;
             }
+            logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
+                    taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
+            readyToSubmitTaskQueue.put(taskInstance);
         } catch (Exception e) {
-            logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e);
+            logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
         }
     }
 
@@ -1626,4 +1636,25 @@ public class WorkflowExecuteThread {
             return false;
         }
     }
+
+    /**
+     * check if had not fail task by taskCode and version
+     * @param taskCode
+     * @param version
+     * @return
+     */
+    private boolean hadNotFailTask(long taskCode, int version) {
+        boolean result = false;
+        for (Entry<Integer, TaskInstance> entry : taskInstanceMap.entrySet()) {
+            TaskInstance taskInstance = entry.getValue();
+            if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == version) {
+                if (!taskInstance.getState().typeIsFailure()) {
+                    result = true;
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
 }