Browse Source

fix repeated submit task (#9323)

Co-authored-by: caishunfeng <534328519@qq.com>
caishunfeng 3 years ago
parent
commit
40e44f6bc4

+ 2 - 4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@@ -109,10 +109,8 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
         if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
             return;
         }
-        ListenableFuture future = this.submitListenable(() -> {
-            multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
-            taskExecuteThread.run();
-        });
+        multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
+        ListenableFuture future = this.submitListenable(taskExecuteThread::run);
         future.addCallback(new ListenableFutureCallback() {
             @Override
             public void onFailure(Throwable ex) {

+ 2 - 4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@@ -107,11 +107,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
         if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
             return;
         }
+        multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
         int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
-        ListenableFuture future = this.submitListenable(() -> {
-            multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
-            workflowExecuteThread.handleEvents();
-        });
+        ListenableFuture future = this.submitListenable(workflowExecuteThread::handleEvents);
         future.addCallback(new ListenableFutureCallback() {
             @Override
             public void onFailure(Throwable ex) {