Browse Source

Fix serial mode will cause NPE in Workflow bootstrap (#14703)

Wenjun Ruan 1 năm trước cách đây
mục cha
commit
db62ce0e47

+ 8 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@@ -40,6 +40,7 @@ import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -132,8 +133,14 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                 commands.parallelStream()
                         .forEach(command -> {
                             try {
-                                WorkflowExecuteRunnable workflowExecuteRunnable =
+                                Optional<WorkflowExecuteRunnable> workflowExecuteRunnableOptional =
                                         workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command);
+                                if (!workflowExecuteRunnableOptional.isPresent()) {
+                                    log.warn(
+                                            "The command execute success, will not trigger a WorkflowExecuteRunnable, this workflowInstance might be in serial mode");
+                                    return;
+                                }
+                                WorkflowExecuteRunnable workflowExecuteRunnable = workflowExecuteRunnableOptional.get();
                                 ProcessInstance processInstance = workflowExecuteRunnable
                                         .getWorkflowExecuteContext().getWorkflowInstance();
                                 if (processInstanceExecCacheManager.contains(processInstance.getId())) {

+ 11 - 9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java

@@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.util.Optional;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -50,21 +52,22 @@ public class WorkflowExecuteContextFactory {
     @Autowired
     private MasterConfig masterConfig;
 
-    public IWorkflowExecuteContext createWorkflowExecuteRunnableContext(Command command) throws Exception {
-        ProcessInstance workflowInstance = createWorkflowInstance(command);
+    public Optional<IWorkflowExecuteContext> createWorkflowExecuteRunnableContext(Command command) throws Exception {
+        Optional<ProcessInstance> workflowInstanceOptional = createWorkflowInstance(command);
+        if (!workflowInstanceOptional.isPresent()) {
+            return Optional.empty();
+        }
+        ProcessInstance workflowInstance = workflowInstanceOptional.get();
         ProcessDefinition workflowDefinition = processService.findProcessDefinition(
                 workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion());
         workflowInstance.setProcessDefinition(workflowDefinition);
 
         IWorkflowGraph workflowGraph = workflowGraphFactory.createWorkflowGraph(workflowInstance);
 
-        return new WorkflowExecuteContext(
-                workflowDefinition,
-                workflowInstance,
-                workflowGraph);
+        return Optional.of(new WorkflowExecuteContext(workflowDefinition, workflowInstance, workflowGraph));
     }
 
-    private ProcessInstance createWorkflowInstance(Command command) throws CronParseException {
+    private Optional<ProcessInstance> createWorkflowInstance(Command command) throws CronParseException {
         long commandTransformStartTime = System.currentTimeMillis();
         // Note: this check is not safe, the slot may change after command transform.
         // We use the database transaction in `handleCommand` so that we can guarantee the command will
@@ -76,10 +79,9 @@ public class WorkflowExecuteContextFactory {
             throw new RuntimeException("Slot check failed the current state: " + slotCheckState);
         }
         ProcessInstance processInstance = processService.handleCommand(masterConfig.getMasterAddress(), command);
-        log.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
         ProcessInstanceMetrics
                 .recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
-        return processInstance;
+        return Optional.ofNullable(processInstance);
     }
 
     private SlotCheckState slotCheck(Command command) {

+ 7 - 9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java

@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
-import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
@@ -30,6 +29,8 @@ import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.util.Optional;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -66,21 +67,18 @@ public class WorkflowExecuteRunnableFactory {
     @Autowired
     private MasterConfig masterConfig;
 
-    @Autowired
-    private TaskDefinitionLogDao taskDefinitionLogDao;
-
     @Autowired
     private DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory;
 
     @Autowired
     private WorkflowExecuteContextFactory workflowExecuteContextFactory;
 
-    public WorkflowExecuteRunnable createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException {
+    public Optional<WorkflowExecuteRunnable> createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException {
         try {
-            IWorkflowExecuteContext workflowExecuteRunnableContext =
+            Optional<IWorkflowExecuteContext> workflowExecuteRunnableContextOptional =
                     workflowExecuteContextFactory.createWorkflowExecuteRunnableContext(command);
-            return new WorkflowExecuteRunnable(
-                    workflowExecuteRunnableContext,
+            return workflowExecuteRunnableContextOptional.map(iWorkflowExecuteContext -> new WorkflowExecuteRunnable(
+                    iWorkflowExecuteContext,
                     commandService,
                     processService,
                     processInstanceDao,
@@ -90,7 +88,7 @@ public class WorkflowExecuteRunnableFactory {
                     stateWheelExecuteThread,
                     curingGlobalParamsService,
                     taskInstanceDao,
-                    defaultTaskExecuteRunnableFactory);
+                    defaultTaskExecuteRunnableFactory));
         } catch (Exception ex) {
             throw new WorkflowCreateException("Create workflow execute runnable failed", ex);
         }

+ 3 - 0
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -56,11 +56,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import javax.annotation.Nullable;
+
 import org.springframework.transaction.annotation.Transactional;
 
 public interface ProcessService {
 
     @Transactional
+    @Nullable
     ProcessInstance handleCommand(String host,
                                   Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException;
 

+ 4 - 2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -303,6 +303,7 @@ public class ProcessServiceImpl implements ProcessService {
     @Autowired
     private TriggerRelationService triggerRelationService;
     /**
+     * todo: split this method
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      *
      * @param host    host
@@ -311,8 +312,8 @@ public class ProcessServiceImpl implements ProcessService {
      */
     @Override
     @Transactional
-    public ProcessInstance handleCommand(String host,
-                                         Command command) throws CronParseException, CodeGenerateException {
+    public @Nullable ProcessInstance handleCommand(String host,
+                                                   Command command) throws CronParseException, CodeGenerateException {
         ProcessInstance processInstance = constructProcessInstance(command, host);
         // cannot construct process instance, return null
         if (processInstance == null) {
@@ -332,6 +333,7 @@ public class ProcessServiceImpl implements ProcessService {
                 setSubProcessParam(processInstance);
                 triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
                 deleteCommandWithCheck(command.getId());
+                // todo: this is a bad design to return null here, whether trigger the task
                 return null;
             }
         } else {