Browse Source

[Improvement-9288][Master] add task event thread pool (#9293)

* add task event thread

* license heander

* ci test

* delete unuse file

* fix CI test

Co-authored-by: caishunfeng <534328519@qq.com>
caishunfeng 3 years ago
parent
commit
52ba2c6475

+ 35 - 158
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java

@@ -17,24 +17,14 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -44,8 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import io.netty.channel.Channel;
-
 /**
  * task manager
  */
@@ -62,46 +50,39 @@ public class TaskEventService {
      */
     private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
 
-    /**
-     * process service
-     */
-    @Autowired
-    private ProcessService processService;
-
-    /**
-     * data quality result operator
-     */
-    @Autowired
-    private DataQualityResultOperator dataQualityResultOperator;
-
     /**
      * task event worker
      */
-    private Thread taskEventWorker;
+    private Thread taskEventThread;
 
-    @Autowired
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+    private Thread taskEventHandlerThread;
 
     @Autowired
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+    private TaskExecuteThreadPool taskExecuteThreadPool;
 
     @PostConstruct
     public void start() {
-        this.taskEventWorker = new TaskEventWorker();
-        this.taskEventWorker.setName("TaskStateEventWorker");
-        this.taskEventWorker.start();
+        this.taskEventThread = new TaskEventThread();
+        this.taskEventThread.setName("TaskEventThread");
+        this.taskEventThread.start();
+
+        this.taskEventHandlerThread = new TaskEventHandlerThread();
+        this.taskEventHandlerThread.setName("TaskEventHandlerThread");
+        this.taskEventHandlerThread.start();
     }
 
     @PreDestroy
     public void stop() {
         try {
-            this.taskEventWorker.interrupt();
+            this.taskEventThread.interrupt();
+            this.taskEventHandlerThread.interrupt();
             if (!eventQueue.isEmpty()) {
                 List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
                 eventQueue.drainTo(remainEvents);
-                for (TaskEvent event : remainEvents) {
-                    this.persist(event);
+                for (TaskEvent taskEvent : remainEvents) {
+                    taskExecuteThreadPool.submitTaskEvent(taskEvent);
                 }
+                taskExecuteThreadPool.eventHandler();
             }
         } catch (Exception e) {
             logger.error("stop error:", e);
@@ -109,32 +90,25 @@ public class TaskEventService {
     }
 
     /**
-     * add event to queue
+     * add event
      *
      * @param taskEvent taskEvent
      */
     public void addEvent(TaskEvent taskEvent) {
-        try {
-            eventQueue.put(taskEvent);
-        } catch (InterruptedException e) {
-            logger.error("add task event : {} error :{}", taskEvent, e);
-            Thread.currentThread().interrupt();
-        }
+        taskExecuteThreadPool.submitTaskEvent(taskEvent);
     }
 
     /**
      * task worker thread
      */
-    class TaskEventWorker extends Thread {
-
+    class TaskEventThread extends Thread {
         @Override
         public void run() {
-
             while (Stopper.isRunning()) {
                 try {
                     // if not task , blocking here
                     TaskEvent taskEvent = eventQueue.take();
-                    persist(taskEvent);
+                    taskExecuteThreadPool.submitTaskEvent(taskEvent);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     break;
@@ -147,121 +121,24 @@ public class TaskEventService {
     }
 
     /**
-     * persist task event
-     *
-     * @param taskEvent taskEvent
-     */
-    private void persist(TaskEvent taskEvent) {
-        Event event = taskEvent.getEvent();
-        int taskInstanceId = taskEvent.getTaskInstanceId();
-        int processInstanceId = taskEvent.getProcessInstanceId();
-
-        TaskInstance taskInstance;
-        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-        if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
-            taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
-        } else {
-            taskInstance = processService.findTaskInstanceById(taskInstanceId);
-        }
-
-        switch (event) {
-            case DISPATCH:
-                handleDispatchEvent(taskEvent, taskInstance);
-                // dispatch event do not need to submit state event
-                return;
-            case DELAY:
-            case RUNNING:
-                handleRunningEvent(taskEvent, taskInstance);
-                break;
-            case RESULT:
-                handleResultEvent(taskEvent, taskInstance);
-                break;
-            default:
-                throw new IllegalArgumentException("invalid event type : " + event);
-        }
-
-        StateEvent stateEvent = new StateEvent();
-        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
-        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
-        stateEvent.setExecutionStatus(taskEvent.getState());
-        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-        workflowExecuteThreadPool.submitStateEvent(stateEvent);
-    }
-
-    /**
-     * handle dispatch event
+     * event handler thread
      */
-    private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
-        if (taskInstance == null) {
-            logger.error("taskInstance is null");
-            return;
-        }
-        if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
-            return;
-        }
-        taskInstance.setState(ExecutionStatus.DISPATCH);
-        taskInstance.setHost(taskEvent.getWorkerAddress());
-        processService.saveTaskInstance(taskInstance);
-    }
+    class TaskEventHandlerThread extends Thread {
 
-    /**
-     * handle running event
-     */
-    private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
-        Channel channel = taskEvent.getChannel();
-        try {
-            if (taskInstance != null) {
-                if (taskInstance.getState().typeIsFinished()) {
-                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
-                } else {
-                    taskInstance.setState(taskEvent.getState());
-                    taskInstance.setStartTime(taskEvent.getStartTime());
-                    taskInstance.setHost(taskEvent.getWorkerAddress());
-                    taskInstance.setLogPath(taskEvent.getLogPath());
-                    taskInstance.setExecutePath(taskEvent.getExecutePath());
-                    taskInstance.setPid(taskEvent.getProcessId());
-                    taskInstance.setAppLink(taskEvent.getAppIds());
-                    processService.saveTaskInstance(taskInstance);
+        @Override
+        public void run() {
+            logger.info("event handler thread started");
+            while (Stopper.isRunning()) {
+                try {
+                    taskExecuteThreadPool.eventHandler();
+                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
+                } catch (Exception e) {
+                    logger.error("event handler thread error", e);
                 }
             }
-            // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
-            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
-        } catch (Exception e) {
-            logger.error("worker ack master error", e);
-            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
-        }
-    }
-
-    /**
-     * handle result event
-     */
-    private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
-        Channel channel = taskEvent.getChannel();
-        try {
-            if (taskInstance != null) {
-                dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
-
-                taskInstance.setStartTime(taskEvent.getStartTime());
-                taskInstance.setHost(taskEvent.getWorkerAddress());
-                taskInstance.setLogPath(taskEvent.getLogPath());
-                taskInstance.setExecutePath(taskEvent.getExecutePath());
-                taskInstance.setPid(taskEvent.getProcessId());
-                taskInstance.setAppLink(taskEvent.getAppIds());
-                taskInstance.setState(taskEvent.getState());
-                taskInstance.setEndTime(taskEvent.getEndTime());
-                taskInstance.setVarPool(taskEvent.getVarPool());
-                processService.changeOutParam(taskInstance);
-                processService.saveTaskInstance(taskInstance);
-            }
-            // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
-            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
-        } catch (Exception e) {
-            logger.error("worker response master error", e);
-            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         }
     }
 }

+ 224 - 0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java

@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
+/**
+ * task execute thread
+ */
+public class TaskExecuteThread {
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
+
+    private final int processInstanceId;
+
+    private final ConcurrentLinkedQueue<TaskEvent> events = new ConcurrentLinkedQueue<>();
+
+    private ProcessService processService;
+
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    private DataQualityResultOperator dataQualityResultOperator;
+
+    public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
+                             ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
+        this.processInstanceId = processInstanceId;
+        this.processService = processService;
+        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
+        this.dataQualityResultOperator = dataQualityResultOperator;
+    }
+
+    public void run() {
+        while (!this.events.isEmpty()) {
+            TaskEvent event = this.events.peek();
+            try {
+                persist(event);
+            } catch (Exception e) {
+                logger.error("persist error, event:{}, error: {}", event, e);
+            } finally {
+                this.events.remove(event);
+            }
+        }
+    }
+
+    public String getKey() {
+        return String.valueOf(processInstanceId);
+    }
+
+    public int eventSize() {
+        return this.events.size();
+    }
+
+    public boolean isEmpty() {
+        return this.events.isEmpty();
+    }
+
+    public Integer getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public boolean addEvent(TaskEvent event) {
+        if (event.getProcessInstanceId() != this.processInstanceId) {
+            logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}",
+                    event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId);
+            return false;
+        }
+        return this.events.add(event);
+    }
+
+    /**
+     * persist task event
+     *
+     * @param taskEvent taskEvent
+     */
+    private void persist(TaskEvent taskEvent) {
+        Event event = taskEvent.getEvent();
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        TaskInstance taskInstance;
+        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
+            taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+        } else {
+            taskInstance = processService.findTaskInstanceById(taskInstanceId);
+        }
+
+        switch (event) {
+            case DISPATCH:
+                handleDispatchEvent(taskEvent, taskInstance);
+                // dispatch event do not need to submit state event
+                return;
+            case DELAY:
+            case RUNNING:
+                handleRunningEvent(taskEvent, taskInstance);
+                break;
+            case RESULT:
+                handleResultEvent(taskEvent, taskInstance);
+                break;
+            default:
+                throw new IllegalArgumentException("invalid event type : " + event);
+        }
+
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+        stateEvent.setExecutionStatus(taskEvent.getState());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+    }
+
+    /**
+     * handle dispatch event
+     */
+    private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+        if (taskInstance == null) {
+            logger.error("taskInstance is null");
+            return;
+        }
+        if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
+            return;
+        }
+        taskInstance.setState(ExecutionStatus.DISPATCH);
+        taskInstance.setHost(taskEvent.getWorkerAddress());
+        processService.saveTaskInstance(taskInstance);
+    }
+
+    /**
+     * handle running event
+     */
+    private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+        Channel channel = taskEvent.getChannel();
+        try {
+            if (taskInstance != null) {
+                if (taskInstance.getState().typeIsFinished()) {
+                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
+                } else {
+                    taskInstance.setState(taskEvent.getState());
+                    taskInstance.setStartTime(taskEvent.getStartTime());
+                    taskInstance.setHost(taskEvent.getWorkerAddress());
+                    taskInstance.setLogPath(taskEvent.getLogPath());
+                    taskInstance.setExecutePath(taskEvent.getExecutePath());
+                    taskInstance.setPid(taskEvent.getProcessId());
+                    taskInstance.setAppLink(taskEvent.getAppIds());
+                    processService.saveTaskInstance(taskInstance);
+                }
+            }
+            // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
+            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+        } catch (Exception e) {
+            logger.error("worker ack master error", e);
+            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+        }
+    }
+
+    /**
+     * handle result event
+     */
+    private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+        Channel channel = taskEvent.getChannel();
+        try {
+            if (taskInstance != null) {
+                dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
+
+                taskInstance.setStartTime(taskEvent.getStartTime());
+                taskInstance.setHost(taskEvent.getWorkerAddress());
+                taskInstance.setLogPath(taskEvent.getLogPath());
+                taskInstance.setExecutePath(taskEvent.getExecutePath());
+                taskInstance.setPid(taskEvent.getProcessId());
+                taskInstance.setAppLink(taskEvent.getAppIds());
+                taskInstance.setState(taskEvent.getState());
+                taskInstance.setEndTime(taskEvent.getEndTime());
+                taskInstance.setVarPool(taskEvent.getVarPool());
+                processService.changeOutParam(taskInstance);
+                processService.saveTaskInstance(taskInstance);
+            }
+            // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
+            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+        } catch (Exception e) {
+            logger.error("worker response master error", e);
+            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+        }
+    }
+}

+ 138 - 0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+@Component
+public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class);
+
+    private final ConcurrentHashMap<String, TaskExecuteThread> multiThreadFilterMap = new ConcurrentHashMap<>();
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    /**
+     * process service
+     */
+    @Autowired
+    private ProcessService processService;
+
+    /**
+     * data quality result operator
+     */
+    @Autowired
+    private DataQualityResultOperator dataQualityResultOperator;
+
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    /**
+     * task event thread map
+     */
+    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    private void init() {
+        this.setDaemon(true);
+        this.setThreadNamePrefix("Task-Execute-Thread-");
+        this.setMaxPoolSize(masterConfig.getExecThreads());
+        this.setCorePoolSize(masterConfig.getExecThreads());
+    }
+
+    public void submitTaskEvent(TaskEvent taskEvent) {
+        if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
+            logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
+            return;
+        }
+        if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
+            TaskExecuteThread taskExecuteThread = new TaskExecuteThread(
+                    taskEvent.getProcessInstanceId(),
+                    processService, workflowExecuteThreadPool,
+                    processInstanceExecCacheManager,
+                    dataQualityResultOperator);
+            taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
+        }
+        TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
+        if (taskExecuteThread != null) {
+            taskExecuteThread.addEvent(taskEvent);
+        }
+    }
+
+    public void eventHandler() {
+        for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) {
+            executeEvent(taskExecuteThread);
+        }
+    }
+
+    public void executeEvent(TaskExecuteThread taskExecuteThread) {
+        if (taskExecuteThread.eventSize() == 0) {
+            return;
+        }
+        if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
+            return;
+        }
+        ListenableFuture future = this.submitListenable(() -> {
+            taskExecuteThread.run();
+            multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
+        });
+        future.addCallback(new ListenableFutureCallback() {
+            @Override
+            public void onFailure(Throwable ex) {
+                logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
+                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
+                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
+                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+                }
+                multiThreadFilterMap.remove(taskExecuteThread.getKey());
+            }
+
+            @Override
+            public void onSuccess(Object result) {
+                logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
+                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
+                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
+                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+                }
+                multiThreadFilterMap.remove(taskExecuteThread.getKey());
+            }
+        });
+    }
+}

+ 3 - 2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@@ -67,6 +67,9 @@ public class TaskResponseServiceTest {
     @Mock
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
+    @Mock
+    private TaskExecuteThreadPool taskExecuteThreadPool;
+
     @Before
     public void before() {
         taskEventService.start();
@@ -101,8 +104,6 @@ public class TaskResponseServiceTest {
 
     @Test
     public void testAddResponse() {
-        Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
-        Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
         taskEventService.addEvent(ackEvent);
         taskEventService.addEvent(resultEvent);
     }