Browse Source

Fix failover Master might not release taskGroup (#15287)

Wenjun Ruan 1 year ago
parent
commit
14272dafab

+ 11 - 6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -465,12 +465,13 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
      * release task group
      *
      */
-    public void releaseTaskGroup(TaskInstance taskInstance) throws InterruptedException {
+    public void releaseTaskGroup(TaskInstance taskInstance) {
         ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
         // todo: use Integer
         if (taskInstance.getTaskGroupId() <= 0) {
             log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup",
                     taskInstance.getName());
+            return;
         }
         TaskInstance nextTaskInstance = processService.releaseTaskGroup(taskInstance);
         if (nextTaskInstance == null) {
@@ -1347,9 +1348,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
                 TaskExecutionStatus state = existTaskInstance.getState();
                 if (state == TaskExecutionStatus.RUNNING_EXECUTION
                         || state == TaskExecutionStatus.DISPATCH
-                        || state == TaskExecutionStatus.SUBMITTED_SUCCESS) {
+                        || state == TaskExecutionStatus.SUBMITTED_SUCCESS
+                        || state == TaskExecutionStatus.DELAY_EXECUTION) {
                     // try to take over task instance
                     if (state != TaskExecutionStatus.SUBMITTED_SUCCESS
+                            && state != TaskExecutionStatus.DELAY_EXECUTION
                             && tryToTakeOverTaskInstance(existTaskInstance)) {
                         log.info("Success take over task {}", existTaskInstance.getName());
                         continue;
@@ -1357,6 +1360,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
                         // set the task instance state to fault tolerance
                         existTaskInstance.setFlag(Flag.NO);
                         existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
+                        releaseTaskGroup(existTaskInstance);
+
                         validTaskMap.remove(existTaskInstance.getTaskCode());
                         taskInstanceDao.updateById(existTaskInstance);
                         existTaskInstance = cloneTolerantTaskInstance(existTaskInstance);
@@ -1444,12 +1449,12 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
             ITaskInstanceOperator iTaskInstanceOperator =
                     SingletonJdkDynamicRpcClientProxyFactory
                             .getProxyClient(taskInstance.getHost(), ITaskInstanceOperator.class);
-            UpdateWorkflowHostResponse updateWorkflowHostResponse = iTaskInstanceOperator.updateWorkflowInstanceHost(
+            UpdateWorkflowHostResponse response = iTaskInstanceOperator.updateWorkflowInstanceHost(
                     new UpdateWorkflowHostRequest(taskInstance.getId(), masterConfig.getMasterAddress()));
-            if (!updateWorkflowHostResponse.isSuccess()) {
+            if (!response.isSuccess()) {
                 log.error(
-                        "Takeover TaskInstance failed, receive a failed response from worker: {}, will try to create a new TaskInstance",
-                        taskInstance.getHost());
+                        "Takeover TaskInstance failed, receive a failed response: {} from worker: {}, will try to create a new TaskInstance",
+                        response, taskInstance.getHost());
                 return false;
             }