Browse Source

[Bug-6455][Master]fix bug 6455: cannot stop sub-task (#6458)

* fix bug: cannot stop the task.

* fix bug: cannot stop the task.

* remove the check thread number
OS 3 years ago
parent
commit
a8baa9553f

+ 5 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java

@@ -61,8 +61,12 @@ public class StateEventProcessor implements NettyRequestProcessor {
 
         StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class);
         StateEvent stateEvent = new StateEvent();
-        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
         stateEvent.setKey(stateEventChangeCommand.getKey());
+        if (stateEventChangeCommand.getSourceProcessInstanceId() != stateEventChangeCommand.getDestProcessInstanceId()) {
+            stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+        } else {
+            stateEvent.setExecutionStatus(stateEventChangeCommand.getSourceStatus());
+        }
         stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
         stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
         StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;

+ 1 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -416,7 +416,7 @@ public class WorkflowExecuteThread implements Runnable {
             if (stateEvent.getExecutionStatus().typeIsFinished()) {
                 endProcess();
             }
-            if (stateEvent.getExecutionStatus() == ExecutionStatus.READY_STOP) {
+            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                 killAllTasks();
             }
             return true;

+ 1 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@@ -55,7 +55,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     MasterConfig masterConfig;
 
     @Autowired
-    NettyExecutorManager nettyExecutorManager;
+    NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
 
     /**
      * logger of MasterBaseTaskExecThread

+ 16 - 2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
 import java.util.concurrent.locks.Lock;
@@ -43,6 +46,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
      */
     private final Lock runLock = new ReentrantLock();
 
+    private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
+
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
         this.processInstance = processInstance;
@@ -121,8 +126,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         }
         subProcessInstance.setState(ExecutionStatus.READY_PAUSE);
         processService.updateProcessInstance(subProcessInstance);
-        //TODO...
-        // send event to sub process master
+        sendToSubProcess();
         return true;
     }
 
@@ -157,9 +161,19 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         }
         subProcessInstance.setState(ExecutionStatus.READY_STOP);
         processService.updateProcessInstance(subProcessInstance);
+        sendToSubProcess();
         return true;
     }
 
+    private void sendToSubProcess() {
+        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
+                processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0
+        );
+        String address = subProcessInstance.getHost().split(":")[0];
+        int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
+        this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
+    }
+
     @Override
     public String getType() {
         return TaskType.SUB_PROCESS.getDesc();

+ 0 - 4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -222,10 +222,6 @@ public class ProcessService {
             moveToErrorCommand(command, "process instance is null");
             return null;
         }
-        if (!checkThreadNum(command, validThreadNum)) {
-            logger.info("there is not enough thread for this command: {}", command);
-            return setWaitingThreadProcess(command, processInstance);
-        }
         processInstance.setCommandType(command.getCommandType());
         processInstance.addHistoryCmd(command.getCommandType());
         saveProcessInstance(processInstance);