Przeglądaj źródła

[Fix] Fix running task instance throught api gots failed (#14433)

* update logic

* split method
旺阳 1 rok temu
rodzic
commit
d38d504332

+ 6 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java

@@ -54,6 +54,7 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher;
 import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -104,6 +105,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
 
     protected TaskExecuteStartMessage taskExecuteStartMessage;
 
+    protected TaskExecutionContextFactory taskExecutionContextFactory;
+
     /**
      * task event queue
      */
@@ -122,6 +125,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
                 SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
         this.taskDefinition = taskDefinition;
         this.taskExecuteStartMessage = taskExecuteStartMessage;
+        this.taskExecutionContextFactory = SpringApplicationContext.getBean(TaskExecutionContextFactory.class);
     }
 
     public TaskInstance getTaskInstance() {
@@ -337,7 +341,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
         taskExecutionContext.setProcessDefineVersion(processDefinition.getVersion());
         // process instance id default 0
         taskExecutionContext.setProcessInstanceId(0);
-
+        taskExecutionContextFactory.setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, tenantCode);
+        taskExecutionContextFactory.setK8sTaskRelatedInfo(taskExecutionContext, taskInstance);
         return taskExecutionContext;
     }
 

+ 21 - 12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java

@@ -121,33 +121,42 @@ public class TaskExecutionContextFactory {
                         .orElse(null);
         setTaskResourceInfo(resources);
 
-        // TODO to be optimized
-        DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
-        if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
-            dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext();
-            setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, workflowInstance.getTenantCode());
-        }
-
-        K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance);
-
         Map<String, Property> businessParamsMap = curingParamsService.preBuildBusinessParams(workflowInstance);
 
         AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder()
                 .taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
         Map<String, Property> propertyMap =
                 curingParamsService.paramParsingPreparation(taskInstance, baseParam, workflowInstance);
-        return TaskExecutionContextBuilder.get()
+        TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
                 .buildWorkflowInstanceHost(masterConfig.getMasterAddress())
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
                 .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
                 .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
                 .buildResourceParametersInfo(resources)
-                .buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
-                .buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
                 .buildBusinessParamsMap(businessParamsMap)
                 .buildParamInfo(propertyMap)
                 .create();
+
+        setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, workflowInstance.getTenantCode());
+        setK8sTaskRelatedInfo(taskExecutionContext, taskInstance);
+        return taskExecutionContext;
+    }
+
+    public void setDataQualityTaskExecutionContext(TaskExecutionContext taskExecutionContext, TaskInstance taskInstance,
+                                                   String tenantCode) {
+        // TODO to be optimized
+        DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
+        if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
+            dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext();
+            setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenantCode);
+        }
+        taskExecutionContext.setDataQualityTaskExecutionContext(dataQualityTaskExecutionContext);
+    }
+
+    public void setK8sTaskRelatedInfo(TaskExecutionContext taskExecutionContext, TaskInstance taskInstance) {
+        K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance);
+        taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
     }
 
     private Map<String, String> getResourceFullNames(TaskInstance taskInstance) {