Browse Source

Merge pull request #4097 from CalvinKirs/ack_status

[FIX-#4084][server]fix taskInstance state change error
dailidong 4 years ago
parent
commit
656ec295b9

+ 98 - 80
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.master.consumer;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.SqoopJobType;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UdfType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -33,10 +33,24 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
 import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.EnumUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.*;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskPriority;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -44,21 +58,27 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.annotation.PostConstruct;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 /**
  * TaskUpdateQueue consumer
  */
 @Component
-public class TaskPriorityQueueConsumer extends Thread{
+public class TaskPriorityQueueConsumer extends Thread {
 
     /**
      * logger of TaskUpdateQueueConsumer
@@ -91,7 +111,7 @@ public class TaskPriorityQueueConsumer extends Thread{
     private MasterConfig masterConfig;
 
     @PostConstruct
-    public void init(){
+    public void init() {
         super.setName("TaskUpdateQueueConsumerThread");
         super.start();
     }
@@ -99,12 +119,12 @@ public class TaskPriorityQueueConsumer extends Thread{
     @Override
     public void run() {
         List<String> failedDispatchTasks = new ArrayList<>();
-        while (Stopper.isRunning()){
+        while (Stopper.isRunning()) {
             try {
                 int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
                 failedDispatchTasks.clear();
-                for(int i = 0; i < fetchTaskNum; i++){
-                    if(taskPriorityQueue.size() <= 0){
+                for (int i = 0; i < fetchTaskNum; i++) {
+                    if (taskPriorityQueue.size() <= 0) {
                         Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                         continue;
                     }
@@ -112,62 +132,62 @@ public class TaskPriorityQueueConsumer extends Thread{
                     String taskPriorityInfo = taskPriorityQueue.take();
                     TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
                     boolean dispatchResult = dispatch(taskPriority.getTaskId());
-                    if(!dispatchResult){
+                    if (!dispatchResult) {
                         failedDispatchTasks.add(taskPriorityInfo);
                     }
                 }
-                for(String dispatchFailedTask : failedDispatchTasks){
+                for (String dispatchFailedTask : failedDispatchTasks) {
                     taskPriorityQueue.put(dispatchFailedTask);
                 }
-            }catch (Exception e){
-                logger.error("dispatcher task error",e);
+            } catch (Exception e) {
+                logger.error("dispatcher task error", e);
             }
         }
     }
 
-
     /**
      * dispatch task
      *
      * @param taskInstanceId taskInstanceId
      * @return result
      */
-    private boolean dispatch(int taskInstanceId){
+    private boolean dispatch(int taskInstanceId) {
         boolean result = false;
         try {
             TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
             ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
 
-            if (taskInstanceIsFinalState(taskInstanceId)){
+            if (taskInstanceIsFinalState(taskInstanceId)) {
                 // when task finish, ignore this task, there is no need to dispatch anymore
                 return true;
-            }else{
+            } else {
                 result = dispatcher.dispatch(executionContext);
             }
         } catch (ExecuteException e) {
-            logger.error("dispatch error",e);
+            logger.error("dispatch error", e);
         }
         return result;
     }
 
-
     /**
      * taskInstance is final state
      * success,failure,kill,stop,pause,threadwaiting is final state
+     *
      * @param taskInstanceId taskInstanceId
      * @return taskInstance is final state
      */
-    public Boolean taskInstanceIsFinalState(int taskInstanceId){
+    public Boolean taskInstanceIsFinalState(int taskInstanceId) {
         TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
         return taskInstance.getState().typeIsFinished();
     }
 
     /**
      * get TaskExecutionContext
+     *
      * @param taskInstanceId taskInstanceId
      * @return TaskExecutionContext
      */
-    protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){
+    protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) {
         TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId);
 
         // task type
@@ -181,12 +201,12 @@ public class TaskPriorityQueueConsumer extends Thread{
 
         // verify tenant is null
         if (verifyTenantIsNull(tenant, taskInstance)) {
-            processService.changeTaskState(ExecutionStatus.FAILURE,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    null,
-                    taskInstance.getId());
+            processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
+                taskInstance.getStartTime(),
+                taskInstance.getHost(),
+                null,
+                null,
+                taskInstance.getId());
             return null;
         }
         // set queue for process instance, user-specified queue takes precedence over tenant queue
@@ -196,50 +216,47 @@ public class TaskPriorityQueueConsumer extends Thread{
         taskInstance.setExecutePath(getExecLocalPath(taskInstance));
         taskInstance.setResources(getResourceFullNames(taskNode));
 
-
         SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
         DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
         ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
         SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
 
-
         // SQL task
-        if (taskType == TaskType.SQL){
+        if (taskType == TaskType.SQL) {
             setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
 
         }
 
         // DATAX task
-        if (taskType == TaskType.DATAX){
+        if (taskType == TaskType.DATAX) {
             setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
         }
 
-
         // procedure task
-        if (taskType == TaskType.PROCEDURE){
+        if (taskType == TaskType.PROCEDURE) {
             setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
         }
 
-        if (taskType == TaskType.SQOOP){
-            setSqoopTaskRelation(sqoopTaskExecutionContext,taskNode);
+        if (taskType == TaskType.SQOOP) {
+            setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode);
         }
 
-
         return TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
-                .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
-                .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
-                .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
-                .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
-                .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
-                .create();
+            .buildTaskInstanceRelatedInfo(taskInstance)
+            .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+            .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+            .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
+            .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
+            .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
+            .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
+            .create();
     }
 
     /**
      * set procedure task relation
+     *
      * @param procedureTaskExecutionContext procedureTaskExecutionContext
-     * @param taskNode taskNode
+     * @param taskNode                      taskNode
      */
     private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) {
         ProcedureParameters procedureParameters = JSONUtils.parseObject(taskNode.getParams(), ProcedureParameters.class);
@@ -250,8 +267,9 @@ public class TaskPriorityQueueConsumer extends Thread{
 
     /**
      * set datax task relation
+     *
      * @param dataxTaskExecutionContext dataxTaskExecutionContext
-     * @param taskNode taskNode
+     * @param taskNode                  taskNode
      */
     private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
         DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
@@ -259,25 +277,24 @@ public class TaskPriorityQueueConsumer extends Thread{
         DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
         DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
 
-
-        if (dataSource != null){
+        if (dataSource != null) {
             dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
             dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
             dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
         }
 
-        if (dataTarget != null){
+        if (dataTarget != null) {
             dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
             dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
             dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
         }
     }
 
-
     /**
      * set sqoop task relation
+     *
      * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
-     * @param taskNode taskNode
+     * @param taskNode                  taskNode
      */
     private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
         SqoopParameters sqoopParameters = JSONUtils.parseObject(taskNode.getParams(), SqoopParameters.class);
@@ -290,13 +307,13 @@ public class TaskPriorityQueueConsumer extends Thread{
             DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
             DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
 
-            if (dataSource != null){
+            if (dataSource != null) {
                 sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
                 sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
                 sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
             }
 
-            if (dataTarget != null){
+            if (dataTarget != null) {
                 sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
                 sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
                 sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
@@ -306,8 +323,9 @@ public class TaskPriorityQueueConsumer extends Thread{
 
     /**
      * set SQL task relation
+     *
      * @param sqlTaskExecutionContext sqlTaskExecutionContext
-     * @param taskNode taskNode
+     * @param taskNode                taskNode
      */
     private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) {
         SqlParameters sqlParameters = JSONUtils.parseObject(taskNode.getParams(), SqlParameters.class);
@@ -317,20 +335,20 @@ public class TaskPriorityQueueConsumer extends Thread{
 
         // whether udf type
         boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
-                && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+            && StringUtils.isNotEmpty(sqlParameters.getUdfs());
 
-        if (udfTypeFlag){
+        if (udfTypeFlag) {
             String[] udfFunIds = sqlParameters.getUdfs().split(",");
             int[] udfFunIdsArray = new int[udfFunIds.length];
-            for(int i = 0 ; i < udfFunIds.length;i++){
-                udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
+            for (int i = 0; i < udfFunIds.length; i++) {
+                udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
             }
 
             List<UdfFunc> udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray);
-            Map<UdfFunc,String> udfFuncMap = new HashMap<>();
-            for(UdfFunc udfFunc : udfFuncList) {
+            Map<UdfFunc, String> udfFuncMap = new HashMap<>();
+            for (UdfFunc udfFunc : udfFuncList) {
                 String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
-                udfFuncMap.put(udfFunc,tenantCode);
+                udfFuncMap.put(udfFunc, tenantCode);
             }
 
             sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
@@ -342,25 +360,25 @@ public class TaskPriorityQueueConsumer extends Thread{
      *
      * @return execute local path
      */
-    private String getExecLocalPath(TaskInstance taskInstance){
+    private String getExecLocalPath(TaskInstance taskInstance) {
         return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
-                taskInstance.getProcessDefine().getId(),
-                taskInstance.getProcessInstance().getId(),
-                taskInstance.getId());
+            taskInstance.getProcessDefine().getId(),
+            taskInstance.getProcessInstance().getId(),
+            taskInstance.getId());
     }
 
-
     /**
-     *  whehter tenant is null
-     * @param tenant tenant
+     * whehter tenant is null
+     *
+     * @param tenant       tenant
      * @param taskInstance taskInstance
      * @return result
      */
     private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
-        if(tenant == null){
+        if (tenant == null) {
             logger.error("tenant not exists,process instance id : {},task instance id : {}",
-                    taskInstance.getProcessInstance().getId(),
-                    taskInstance.getId());
+                taskInstance.getProcessInstance().getId(),
+                taskInstance.getId());
             return true;
         }
         return false;
@@ -369,8 +387,8 @@ public class TaskPriorityQueueConsumer extends Thread{
     /**
      * get resource map key is full name and value is tenantCode
      */
-    private Map<String,String> getResourceFullNames(TaskNode taskNode) {
-        Map<String,String> resourceMap = new HashMap<>();
+    private Map<String, String> getResourceFullNames(TaskNode taskNode) {
+        Map<String, String> resourceMap = new HashMap<>();
         AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
 
         if (baseParam != null) {
@@ -382,7 +400,7 @@ public class TaskPriorityQueueConsumer extends Thread{
                 if (CollectionUtils.isNotEmpty(oldVersionResources)) {
 
                     oldVersionResources.forEach(
-                            (t)->resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
+                        (t) -> resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
                     );
                 }
 
@@ -395,7 +413,7 @@ public class TaskPriorityQueueConsumer extends Thread{
 
                     List<Resource> resources = processService.listResourceByIds(resourceIds);
                     resources.forEach(
-                            (t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
+                        (t) -> resourceMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
                     );
                 }
             }

+ 46 - 44
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -25,18 +24,22 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import io.netty.channel.Channel;
+
 /**
  * task manager
  */
@@ -65,21 +68,20 @@ public class TaskResponseService {
      */
     private Thread taskResponseWorker;
 
-
     @PostConstruct
-    public void start(){
+    public void start() {
         this.taskResponseWorker = new TaskResponseWorker();
         this.taskResponseWorker.setName("TaskResponseWorker");
         this.taskResponseWorker.start();
     }
 
     @PreDestroy
-    public void stop(){
+    public void stop() {
         this.taskResponseWorker.interrupt();
-        if(!eventQueue.isEmpty()){
+        if (!eventQueue.isEmpty()) {
             List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size());
             eventQueue.drainTo(remainEvents);
-            for(TaskResponseEvent event : remainEvents){
+            for (TaskResponseEvent event : remainEvents) {
                 this.persist(event);
             }
         }
@@ -90,16 +92,15 @@ public class TaskResponseService {
      *
      * @param taskResponseEvent taskResponseEvent
      */
-    public void addResponse(TaskResponseEvent taskResponseEvent){
+    public void addResponse(TaskResponseEvent taskResponseEvent) {
         try {
             eventQueue.put(taskResponseEvent);
         } catch (InterruptedException e) {
-            logger.error("put task : {} error :{}", taskResponseEvent,e);
+            logger.error("put task : {} error :{}", taskResponseEvent, e);
             Thread.currentThread().interrupt();
         }
     }
 
-
     /**
      * task worker thread
      */
@@ -108,16 +109,16 @@ public class TaskResponseService {
         @Override
         public void run() {
 
-            while (Stopper.isRunning()){
+            while (Stopper.isRunning()) {
                 try {
                     // if not task , blocking here
                     TaskResponseEvent taskResponseEvent = eventQueue.take();
                     persist(taskResponseEvent);
-                } catch (InterruptedException e){
+                } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     break;
-                } catch (Exception e){
-                    logger.error("persist task error",e);
+                } catch (Exception e) {
+                    logger.error("persist task error", e);
                 }
             }
             logger.info("TaskResponseWorker stopped");
@@ -126,51 +127,52 @@ public class TaskResponseService {
 
     /**
      * persist  taskResponseEvent
+     *
      * @param taskResponseEvent taskResponseEvent
      */
-    private void persist(TaskResponseEvent taskResponseEvent){
+    private void persist(TaskResponseEvent taskResponseEvent) {
         Event event = taskResponseEvent.getEvent();
         Channel channel = taskResponseEvent.getChannel();
 
-        switch (event){
+        switch (event) {
             case ACK:
                 try {
                     TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-                    if (taskInstance != null){
-                        processService.changeTaskState(taskResponseEvent.getState(),
-                                taskResponseEvent.getStartTime(),
-                                taskResponseEvent.getWorkerAddress(),
-                                taskResponseEvent.getExecutePath(),
-                                taskResponseEvent.getLogPath(),
-                                taskResponseEvent.getTaskInstanceId());
+                    if (taskInstance != null && !taskInstance.getState().typeIsFinished()) {
+                        processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
+                            taskResponseEvent.getStartTime(),
+                            taskResponseEvent.getWorkerAddress(),
+                            taskResponseEvent.getExecutePath(),
+                            taskResponseEvent.getLogPath(),
+                            taskResponseEvent.getTaskInstanceId());
                     }
                     // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
-                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
                     channel.writeAndFlush(taskAckCommand.convert2Command());
-                }catch (Exception e){
-                    logger.error("worker ack master error",e);
-                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1);
+                } catch (Exception e) {
+                    logger.error("worker ack master error", e);
+                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
                     channel.writeAndFlush(taskAckCommand.convert2Command());
                 }
                 break;
             case RESULT:
                 try {
                     TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-                    if (taskInstance != null){
-                        processService.changeTaskState(taskResponseEvent.getState(),
-                                taskResponseEvent.getEndTime(),
-                                taskResponseEvent.getProcessId(),
-                                taskResponseEvent.getAppIds(),
-                                taskResponseEvent.getTaskInstanceId(),
-                                taskResponseEvent.getVarPool()
-                                );
+                    if (taskInstance != null) {
+                        processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
+                            taskResponseEvent.getEndTime(),
+                            taskResponseEvent.getProcessId(),
+                            taskResponseEvent.getAppIds(),
+                            taskResponseEvent.getTaskInstanceId(),
+                            taskResponseEvent.getVarPool()
+                        );
                     }
                     // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
-                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
                     channel.writeAndFlush(taskResponseCommand.convert2Command());
-                }catch (Exception e){
-                    logger.error("worker response master error",e);
-                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1);
+                } catch (Exception e) {
+                    logger.error("worker response master error", e);
+                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
                     channel.writeAndFlush(taskResponseCommand.convert2Command());
                 }
                 break;

+ 62 - 37
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@@ -14,55 +14,80 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.master.processor.queue;
 
+package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.server.registry.DependencyConfig;
-import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
-import org.apache.dolphinscheduler.server.zk.SpringZKServer;
-import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
-import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
-import org.junit.Assert;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
-import java.util.Date;
+import io.netty.channel.Channel;
 
-@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class,
-        ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class,
-        CuratorZookeeperClient.class})
+@RunWith(MockitoJUnitRunner.class)
 public class TaskResponseServiceTest {
 
-    @Autowired
-    private TaskResponseService taskResponseService;
+    @Mock(name = "processService")
+    private ProcessService processService;
 
-    @Test
-    public void testAdd(){
-        TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(),
-                "", "", "", 1,null);
-        taskResponseService.addResponse(taskResponseEvent);
-        Assert.assertTrue(taskResponseService.getEventQueue().size() == 1);
-        try {
-            Thread.sleep(10);
-        } catch (InterruptedException ignore) {
-        }
-        //after sleep, inner worker will take the event
-        Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
+    @InjectMocks
+    TaskResponseService taskRspService;
+
+    @Mock
+    private Channel channel;
+
+    private TaskResponseEvent ackEvent;
+
+    private TaskResponseEvent resultEvent;
+
+    private TaskInstance taskInstance;
+
+    @Before
+    public void before() {
+        taskRspService.start();
+
+        ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,
+            new Date(),
+            "127.*.*.*",
+            "path",
+            "logPath",
+            22,
+            channel);
+
+        resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS,
+            new Date(),
+            1,
+            "ids",
+            22,
+            "varPol",
+            channel);
+
+        taskInstance = new TaskInstance();
+        taskInstance.setId(22);
+        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
     }
 
     @Test
-    public void testStop(){
-        TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(),
-                "", "", "", 1,null);
-        taskResponseService.addResponse(taskResponseEvent);
-        taskResponseService.stop();
-        Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
+    public void testAddResponse() {
+        Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
+        Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
+        taskRspService.addResponse(ackEvent);
+        taskRspService.addResponse(resultEvent);
+    }
+
+    @After
+    public void after() {
+        taskRspService.stop();
     }
+
 }

File diff suppressed because it is too large
+ 243 - 160
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java


+ 1 - 0
pom.xml

@@ -835,6 +835,7 @@
                         <include>**/server/master/SubProcessTaskTest.java</include>
                         <include>**/server/master/processor/TaskAckProcessorTest.java</include>
                         <include>**/server/master/processor/TaskKillResponseProcessorTest.java</include>
+                        <include>**/server/master/processor/queue/TaskResponseServiceTest.java</include>
                         <include>**/server/register/ZookeeperNodeManagerTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         <include>**/server/utils/ExecutionContextTestUtils.java</include>