Sfoglia il codice sorgente

[Bug-9295][Master] fix repeated submit task (#9304)

Co-authored-by: caishunfeng <534328519@qq.com>
caishunfeng 3 anni fa
parent
commit
1073fcae44

+ 4 - 2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -186,8 +186,10 @@ public class TaskPriorityQueueConsumer extends Thread {
             if (result) {
                 addDispatchEvent(context, executionContext);
             }
-        } catch (RuntimeException | ExecuteException e) {
-            logger.error("dispatch error: {}", e.getMessage(), e);
+        } catch (RuntimeException e) {
+            logger.error("dispatch error: ", e);
+        } catch (ExecuteException e) {
+            logger.error("dispatch error: {}", e.getMessage());
         }
         return result;
     }

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@@ -110,8 +110,8 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
             return;
         }
         ListenableFuture future = this.submitListenable(() -> {
-            taskExecuteThread.run();
             multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
+            taskExecuteThread.run();
         });
         future.addCallback(new ListenableFutureCallback() {
             @Override

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@@ -109,8 +109,8 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
         }
         int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
         ListenableFuture future = this.submitListenable(() -> {
-            workflowExecuteThread.handleEvents();
             multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
+            workflowExecuteThread.handleEvents();
         });
         future.addCallback(new ListenableFutureCallback() {
             @Override