瀏覽代碼

fix task dispatch error overload resource pool of task group (#9667)

caishunfeng 3 年之前
父節點
當前提交
88d2803fe1

+ 0 - 4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -351,8 +351,6 @@ public class WorkflowExecuteThread {
         if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
             TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
             ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
-            ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
-            taskProcessor.init(taskInstance, processInstance);
             taskProcessor.action(TaskAction.DISPATCH);
             this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
             return true;
@@ -362,8 +360,6 @@ public class WorkflowExecuteThread {
             if (acquireTaskGroup) {
                 TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
                 ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
-                ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
-                taskProcessor.init(taskInstance, processInstance);
                 taskProcessor.action(TaskAction.DISPATCH);
                 return true;
             }

+ 6 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@@ -61,7 +61,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                     taskInstance.getName(),
                     taskGroupId,
                     taskInstance.getProcessInstanceId(),
-                    taskInstance.getTaskInstancePriority().getCode());
+                    taskInstance.getTaskGroupPriority());
             if (!acquireTaskGroup) {
                 logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
                 return true;
@@ -117,6 +117,11 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                     taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
 
             TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
+            if (taskExecutionContext == null) {
+                logger.error("task get taskExecutionContext fail: {}", taskInstance);
+                return false;
+            }
+
             taskPriority.setTaskExecutionContext(taskExecutionContext);
 
             taskUpdateQueue.put(taskPriority);

+ 1 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -1490,7 +1490,6 @@ public class ProcessServiceImpl implements ProcessService {
             taskInstance.setState(ExecutionStatus.PAUSE);
         }
         taskInstance.setExecutorId(processInstance.getExecutorId());
-        taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
         taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
         if (taskInstance.getSubmitTime() == null) {
             taskInstance.setSubmitTime(new Date());
@@ -1670,6 +1669,7 @@ public class ProcessServiceImpl implements ProcessService {
     public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
         taskInstance.setProcessInstance(processInstance);
         taskInstance.setProcessDefine(processInstance.getProcessDefinition());
+        taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
         TaskDefinition taskDefinition = this.findTaskDefinition(
             taskInstance.getTaskCode(),
             taskInstance.getTaskDefinitionVersion());