Browse Source

Allow execute task in workflow instance (#13103)

JieguangZhou 2 years ago
parent
commit
e4b9b67255
22 changed files with 528 additions and 14 deletions
  1. 32 0
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 30 0
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowExecuteResponse.java
  3. 1 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
  4. 6 0
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  5. 15 0
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  6. 115 0
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  7. 48 0
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  8. 3 0
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
  9. 9 0
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
  10. 8 0
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
  11. 5 0
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
  12. 72 0
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  13. 58 0
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
  14. 18 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  15. 4 0
      dolphinscheduler-ui/src/common/common.ts
  16. 4 0
      dolphinscheduler-ui/src/locales/en_US/project.ts
  17. 4 0
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  18. 9 0
      dolphinscheduler-ui/src/service/modules/executors/index.ts
  19. 7 0
      dolphinscheduler-ui/src/service/modules/executors/types.ts
  20. 53 8
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx
  21. 25 0
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx
  22. 2 3
      dolphinscheduler-ui/src/views/projects/workflow/instance/detail/index.tsx

+ 32 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@@ -450,4 +450,36 @@ public class ExecutorController extends BaseController {
                 warningGroupId, workerGroup, environmentCode, startParamMap, dryRun);
         return returnDataList(result);
     }
+
+    /**
+     * do action to process instance: pause, stop, repeat, recover from pause, recover from stop
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processInstanceId process instance id
+     * @param startNodeList start node list
+     * @param taskDependType task depend type
+     * @return execute result code
+     */
+    @Operation(summary = "execute-task", description = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
+    @Parameters({
+            @Parameter(name = "processInstanceId", description = "PROCESS_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")),
+            @Parameter(name = "startNodeList", description = "START_NODE_LIST", required = true, schema = @Schema(implementation = String.class)),
+            @Parameter(name = "taskDependType", description = "TASK_DEPEND_TYPE", required = true, schema = @Schema(implementation = TaskDependType.class))
+    })
+    @PostMapping(value = "/execute-task")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result executeTask(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                              @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                              @RequestParam("processInstanceId") Integer processInstanceId,
+                              @RequestParam("startNodeList") String startNodeList,
+                              @RequestParam("taskDependType") TaskDependType taskDependType) {
+        logger.info("Start to execute task in process instance, projectCode:{}, processInstanceId:{}.",
+                projectCode,
+                processInstanceId);
+        return execService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType);
+    }
+
 }

+ 30 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowExecuteResponse.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto.workflowInstance;
+
+import org.apache.dolphinscheduler.api.utils.Result;
+
+import lombok.Data;
+
+/**
+ * user List response
+ */
+@Data
+public class WorkflowExecuteResponse extends Result {
+
+}

+ 1 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java

@@ -30,7 +30,7 @@ public enum ExecuteType {
      * 4 stop
      * 5 pause
      */
-    NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE;
+    NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK;
 
     public static ExecuteType getEnum(int value) {
         for (ExecuteType e : ExecuteType.values()) {

+ 6 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@@ -261,6 +261,9 @@ public enum Status {
     SCHEDULE_ALREADY_EXISTS(10204, "workflow {0} schedule {1} already exist, please update or delete it",
             "工作流 {0} 的定时 {1} 已经存在,请更新或删除"),
 
+    EXECUTE_NOT_DEFINE_TASK(10204, "please save and try again",
+            "请先保存后再执行"),
+
     UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
     UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
     RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),
@@ -380,6 +383,9 @@ public enum Status {
     PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
             "批量修改工作流任务关系错误"),
 
+    WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation",
+            "工作流实例未结束,不能执行此操作"),
+
     HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
     STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
     S3_CANNOT_RENAME(60003, "directory cannot be renamed", "S3无法重命名文件夹"),

+ 15 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
@@ -92,6 +93,20 @@ public interface ExecutorService {
      */
     Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType);
 
+    /**
+     * do action to execute task in process instance
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processInstanceId process instance id
+     * @param startNodeList start node list
+     * @param taskDependType task depend type
+     * @return execute result code
+     */
+    WorkflowExecuteResponse executeTask(User loginUser, long projectCode, Integer processInstanceId,
+                                        String startNodeList,
+                                        TaskDependType taskDependType);
+
     /**
      * do action to process instance:pause, stop, repeat, recover from pause, recover from stop
      *

+ 115 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.MAX_TASK_TI
 import static org.apache.dolphinscheduler.common.constants.Constants.SCHEDULE_TIME_MAX_LENGTH;
 
 import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
+import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
@@ -68,6 +69,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
@@ -142,6 +144,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     @Autowired
     private ProcessInstanceDao processInstanceDao;
 
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
     @Autowired
     private StateEventCallbackService stateEventCallbackService;
 
@@ -487,6 +492,116 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         return execute(loginUser, processDefinition.getProjectCode(), workflowInstanceId, executeType);
     }
 
+    /**
+     * do action to execute task in process instance
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processInstanceId process instance id
+     * @param startNodeList start node list
+     * @param taskDependType task depend type
+     * @return execute result code
+     */
+    @Override
+    public WorkflowExecuteResponse executeTask(User loginUser, long projectCode, Integer processInstanceId,
+                                               String startNodeList, TaskDependType taskDependType) {
+
+        WorkflowExecuteResponse response = new WorkflowExecuteResponse();
+
+        Project project = projectMapper.queryByCode(projectCode);
+        // check user access for project
+
+        projectService.checkProjectAndAuthThrowException(loginUser, project,
+                ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK));
+
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
+                .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
+
+        if (!processInstance.getState().isFinished()) {
+            logger.error("Can not execute task for process instance which is not finished, processInstanceId:{}.",
+                    processInstanceId);
+            putMsg(response, Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED);
+            return response;
+        }
+
+        ProcessDefinition processDefinition =
+                processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion());
+        processDefinition.setReleaseState(ReleaseState.ONLINE);
+        this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
+
+        if (!checkTenantSuitable(processDefinition)) {
+            logger.error(
+                    "There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
+                    processDefinition.getId(), processDefinition.getName());
+            putMsg(response, Status.TENANT_NOT_SUITABLE);
+            return response;
+        }
+
+        // get the startParams user specified at the first starting while repeat running is needed
+
+        long startNodeListLong;
+        try {
+            startNodeListLong = Long.parseLong(startNodeList);
+        } catch (NumberFormatException e) {
+            logger.error("startNodeList is not a number");
+            putMsg(response, Status.REQUEST_PARAMS_NOT_VALID_ERROR, startNodeList);
+            return response;
+        }
+
+        if (taskDefinitionLogMapper.queryMaxVersionForDefinition(startNodeListLong) == null) {
+            putMsg(response, Status.EXECUTE_NOT_DEFINE_TASK);
+            return response;
+        }
+
+        // To add startParams only when repeat running is needed
+        Map<String, Object> cmdParam = new HashMap<>();
+        cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstanceId);
+        // Add StartNodeList
+        cmdParam.put(CMD_PARAM_START_NODES, startNodeList);
+
+        Command command = new Command();
+        command.setCommandType(CommandType.EXECUTE_TASK);
+        command.setProcessDefinitionCode(processDefinition.getCode());
+        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+        command.setExecutorId(loginUser.getId());
+        command.setProcessDefinitionVersion(processDefinition.getVersion());
+        command.setProcessInstanceId(processInstanceId);
+        command.setTestFlag(processInstance.getTestFlag());
+
+        // Add taskDependType
+        command.setTaskDependType(taskDependType);
+
+        if (!commandService.verifyIsNeedCreateCommand(command)) {
+            logger.warn(
+                    "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+                    processDefinition.getCode(), processDefinition.getVersion(), processInstanceId);
+            putMsg(response, Status.PROCESS_INSTANCE_EXECUTING_COMMAND,
+                    String.valueOf(processDefinition.getCode()));
+            return response;
+        }
+
+        logger.info("Creating command, commandInfo:{}.", command);
+        int create = commandService.createCommand(command);
+
+        if (create > 0) {
+            logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.",
+                    command.getCommandType().getDescp(), command.getProcessDefinitionCode(),
+                    processDefinition.getVersion());
+            putMsg(response, Status.SUCCESS);
+        } else {
+            logger.error(
+                    "Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+                    command.getCommandType().getDescp(), command.getProcessDefinitionCode(),
+                    processDefinition.getVersion(),
+                    processInstanceId);
+            putMsg(response, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
+        }
+
+        return response;
+    }
+
     @Override
     public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
         Map<String, Object> result = new HashMap<>();

+ 48 - 0
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@@ -21,10 +21,13 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
@@ -38,6 +41,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
@@ -55,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.service.command.CommandService;
@@ -118,6 +123,9 @@ public class ExecutorServiceTest {
     @Mock
     private TaskDefinitionMapper taskDefinitionMapper;
 
+    @Mock
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
     @Mock
     private ProjectMapper projectMapper;
 
@@ -537,4 +545,44 @@ public class ExecutorServiceTest {
         Assertions.assertEquals("4,4", result.get(2));
     }
 
+    @Test
+    public void testExecuteTask() {
+        String startNodeList = "1234567870";
+        TaskDependType taskDependType = TaskDependType.TASK_ONLY;
+
+        ProcessInstance processInstanceMock = Mockito.mock(ProcessInstance.class, RETURNS_DEEP_STUBS);
+        Mockito.when(processService.findProcessInstanceDetailById(processInstanceId))
+                .thenReturn(Optional.ofNullable(processInstanceMock));
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setProjectCode(projectCode);
+        Mockito.when(processService.findProcessDefinition(Mockito.anyLong(), Mockito.anyInt()))
+                .thenReturn(processDefinition);
+
+        Mockito.when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(new Tenant());
+
+        when(processInstanceMock.getState().isFinished()).thenReturn(false);
+        WorkflowExecuteResponse responseInstanceIsNotFinished =
+                executorService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType);
+        Assertions.assertEquals(Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED.getCode(),
+                responseInstanceIsNotFinished.getCode());
+
+        when(processInstanceMock.getState().isFinished()).thenReturn(true);
+        WorkflowExecuteResponse responseStartNodeListError =
+                executorService.executeTask(loginUser, projectCode, processInstanceId, "1234567870,", taskDependType);
+        Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), responseStartNodeListError.getCode());
+
+        Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(Mockito.anyLong())).thenReturn(null);
+        WorkflowExecuteResponse responseNotDefineTask =
+                executorService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType);
+        Assertions.assertEquals(Status.EXECUTE_NOT_DEFINE_TASK.getCode(), responseNotDefineTask.getCode());
+
+        Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(Mockito.anyLong())).thenReturn(1);
+        Mockito.when(commandService.verifyIsNeedCreateCommand(any())).thenReturn(true);
+        WorkflowExecuteResponse responseSuccess =
+                executorService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType);
+        Assertions.assertEquals(Status.SUCCESS.getCode(), responseSuccess.getCode());
+
+    }
+
 }

+ 3 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

@@ -40,6 +40,8 @@ public enum CommandType {
      * 8 pause a process
      * 9 stop a process
      * 10 recover waiting thread
+     * 11 recover serial wait
+     * 12 start a task node in a process instance
      */
     START_PROCESS(0, "start a new process"),
     START_CURRENT_TASK_PROCESS(1, "start a new process from current nodes"),
@@ -53,6 +55,7 @@ public enum CommandType {
     STOP(9, "stop a process"),
     RECOVER_WAITING_THREAD(10, "recover waiting thread"),
     RECOVER_SERIAL_WAIT(11, "recover serial wait"),
+    EXECUTE_TASK(12, "start a task node in a process instance"),
     ;
 
     CommandType(int code, String descp) {

+ 9 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java

@@ -498,6 +498,15 @@ public class DAG<Node, NodeInfo, EdgeInfo> {
         return new AbstractMap.SimpleEntry<>(notZeroIndegreeNodeMap.size() == 0, topoResultList);
     }
 
+    /**
+     * Get all the nodes that are in the graph
+     *
+     * @return all nodes in the graph
+     */
+    public Set<Node> getAllNodesList() {
+        return nodesMap.keySet();
+    }
+
     @Override
     public String toString() {
         return "DAG{"

+ 8 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java

@@ -66,6 +66,14 @@ public interface TaskInstanceDao {
      */
     List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
 
+    /**
+     * Query list of task instance by process instance id and task code
+     * @param processInstanceId processInstanceId
+     * @param taskCode task code
+     * @return list of valid task instance
+     */
+    TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode);
+
     /**
      * find previous task list by work process id
      * @param processInstanceId processInstanceId

+ 5 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java

@@ -147,6 +147,11 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
         return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
     }
 
+    @Override
+    public TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode) {
+        return taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, taskCode);
+    }
+
     @Override
     public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
         ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);

+ 72 - 0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -881,6 +881,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                     LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                 }
             }
+            clearDataIfExecuteTask();
         } else {
             logger.info("The current workflowInstance is a newly running workflowInstance");
         }
@@ -2045,6 +2046,77 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
+    /**
+     * clear related data if command of process instance is EXECUTE_TASK
+     * 1. find all task code from sub dag (only contains related task)
+     * 2. set the flag of tasks to Flag.NO
+     * 3. clear varPool data from re-execute task instance in process instance
+     * 4. remove related task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap
+     *
+     * @return task instance
+     */
+    protected void clearDataIfExecuteTask() {
+        // only clear data if command is EXECUTE_TASK
+        if (!processInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) {
+            return;
+        }
+
+        // Records the key of varPool data to be removed
+        Set<String> taskCodesString = dag.getAllNodesList();
+
+        List<TaskInstance> removeTaskInstances = new ArrayList<>();
+
+        for (String taskCodeString : taskCodesString) {
+            long taskCode = Long.parseLong(taskCodeString);
+            TaskInstance taskInstance;
+            if (validTaskMap.containsKey(taskCode)) {
+                taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode));
+            } else {
+                taskInstance = taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskCode);
+            }
+            if (taskInstance == null) {
+                continue;
+            }
+            removeTaskInstances.add(taskInstance);
+        }
+
+        for (TaskInstance taskInstance : removeTaskInstances) {
+            taskInstance.setFlag(Flag.NO);
+            taskInstanceDao.updateTaskInstance(taskInstance);
+        }
+
+        Set<String> removeSet = new HashSet<>();
+        for (TaskInstance taskInstance : removeTaskInstances) {
+            String taskVarPool = taskInstance.getVarPool();
+            if (StringUtils.isNotEmpty(taskVarPool)) {
+                List<Property> properties = JSONUtils.toList(taskVarPool, Property.class);
+                List<String> keys = properties.stream()
+                        .filter(property -> property.getDirect().equals(Direct.OUT))
+                        .map(property -> String.format("%s_%s", property.getProp(), property.getType()))
+                        .collect(Collectors.toList());
+                removeSet.addAll(keys);
+            }
+        }
+
+        // remove varPool data and update process instance
+        // TODO: we can remove this snippet if : we get varPool from pre taskInstance instead of process instance when
+        // task can not get pre task from incomplete dag
+        List<Property> processProperties = JSONUtils.toList(processInstance.getVarPool(), Property.class);
+        processProperties = processProperties.stream()
+                .filter(property -> !(property.getDirect().equals(Direct.IN)
+                        && removeSet.contains(String.format("%s_%s", property.getProp(), property.getType()))))
+                .collect(Collectors.toList());
+
+        processInstance.setVarPool(JSONUtils.toJsonString(processProperties));
+        processInstanceDao.updateProcessInstance(processInstance);
+
+        // remove task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap
+        taskInstanceMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getValue().getTaskCode())));
+        completeTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
+        validTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
+        errorTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
+    }
+
     private enum WorkflowRunnableStatus {
         CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
         ;

+ 58 - 0
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@@ -22,9 +22,11 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
 
+import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -39,6 +41,7 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
+import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.lang.reflect.Field;
@@ -258,6 +261,61 @@ public class WorkflowExecuteRunnableTest {
         }
     }
 
+    @Test
+    public void testClearDataIfExecuteTask() throws NoSuchFieldException, IllegalAccessException {
+        TaskInstance taskInstance1 = new TaskInstance();
+        taskInstance1.setId(1);
+        taskInstance1.setTaskCode(1);
+
+        TaskInstance taskInstance2 = new TaskInstance();
+        taskInstance2.setId(2);
+        taskInstance2.setTaskCode(2);
+
+        Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
+        taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
+        taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
+
+        Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
+        completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
+        completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
+
+        Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
+
+        Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
+        completeTaskMapField.setAccessible(true);
+        completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+
+        Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap");
+        taskInstanceMapField.setAccessible(true);
+        taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
+
+        Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK);
+        Mockito.when(processInstance.getId()).thenReturn(123);
+
+        DAG<String, TaskNode, TaskNodeRelation> dag = Mockito.mock(DAG.class);
+        Set<String> taskCodesString = new HashSet<>();
+        taskCodesString.add("1");
+        taskCodesString.add("2");
+        Mockito.when(dag.getAllNodesList()).thenReturn(taskCodesString);
+        Mockito.when(dag.containsNode("1")).thenReturn(true);
+        Mockito.when(dag.containsNode("2")).thenReturn(false);
+
+        Field dagField = masterExecThreadClass.getDeclaredField("dag");
+        dagField.setAccessible(true);
+        dagField.set(workflowExecuteThread, dag);
+
+        Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance1.getTaskCode()))
+                .thenReturn(taskInstance1);
+        Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance2.getTaskCode()))
+                .thenReturn(null);
+
+        workflowExecuteThread.clearDataIfExecuteTask();
+
+        Assertions.assertEquals(1, taskInstanceMap.size());
+        Assertions.assertEquals(1, completeTaskList.size());
+
+    }
+
     private List<Schedule> zeroSchedulerList() {
         return Collections.emptyList();
     }

+ 18 - 2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -889,6 +889,12 @@ public class ProcessServiceImpl implements ProcessService {
                     cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING);
                     processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 }
+                // delete the StartNodeList from command parameter if last execution is only execute specified tasks
+                if (processInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) {
+                    cmdParam.remove(CommandKeyConstants.CMD_PARAM_START_NODES);
+                    processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                    processInstance.setTaskDependType(command.getTaskDependType());
+                }
                 // delete all the valid tasks when repeat running
                 List<TaskInstance> validTaskList =
                         taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
@@ -905,6 +911,11 @@ public class ProcessServiceImpl implements ProcessService {
                 break;
             case SCHEDULER:
                 break;
+            case EXECUTE_TASK:
+                processInstance.setRunTimes(runTime + 1);
+                processInstance.setTaskDependType(command.getTaskDependType());
+                processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                break;
             default:
                 break;
         }
@@ -2035,8 +2046,13 @@ public class ProcessServiceImpl implements ProcessService {
         // and update the origin one if exist
         int updateResult = 0;
         int insertResult = 0;
-        if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs)) {
-            insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
+
+        // only insert new task definitions if they not in updateTaskDefinitionLogs
+        List<TaskDefinitionLog> newInsertTaskDefinitionLogs = newTaskDefinitionLogs.stream()
+                .filter(taskDefinitionLog -> !updateTaskDefinitionLogs.contains(taskDefinitionLog))
+                .collect(Collectors.toList());
+        if (CollectionUtils.isNotEmpty(newInsertTaskDefinitionLogs)) {
+            insertResult = taskDefinitionLogMapper.batchInsert(newInsertTaskDefinitionLogs);
         }
         if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
             insertResult += taskDefinitionLogMapper.batchInsert(updateTaskDefinitionLogs);

+ 4 - 0
dolphinscheduler-ui/src/common/common.ts

@@ -121,6 +121,10 @@ export const runningType = (t: any) => [
   {
     desc: `${t('project.workflow.recover_serial_wait')}`,
     code: 'RECOVER_SERIAL_WAIT'
+  },
+  {
+    desc: `${t('project.workflow.execute_task')}`,
+    code: 'EXECUTE_TASK'
   }
 ]
 

+ 4 - 0
dolphinscheduler-ui/src/locales/en_US/project.ts

@@ -103,6 +103,9 @@ export default {
     backward_execution: 'Backward execution',
     forward_execution: 'Forward execution',
     current_node_execution: 'Execute only the current node',
+    backward_execution_task: 'Run backwards',
+    forward_execution_task: 'Run forwards',
+    current_node_execution_task: 'Run',
     notification_strategy: 'Notification Strategy',
     workflow_priority: 'Workflow Priority',
     worker_group: 'Worker Group',
@@ -158,6 +161,7 @@ export default {
     pause: 'Pause',
     recovery_waiting_thread: 'Recovery waiting thread',
     recover_serial_wait: 'Recover serial wait',
+    execute_task: 'Execute the specified task',
     recovery_suspend: 'Recovery Suspend',
     recovery_failed: 'Recovery Failed',
     gantt: 'Gantt',

+ 4 - 0
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@@ -105,6 +105,9 @@ export default {
     backward_execution: '向后执行',
     forward_execution: '向前执行',
     current_node_execution: '仅执行当前节点',
+    backward_execution_task: '向后运行',
+    forward_execution_task: '向前运行',
+    current_node_execution_task: '运行',
     notification_strategy: '通知策略',
     workflow_priority: '流程优先级',
     worker_group: 'Worker分组',
@@ -160,6 +163,7 @@ export default {
     pause: '暂停',
     recovery_waiting_thread: '恢复等待线程',
     recover_serial_wait: '串行恢复',
+    execute_task: '执行指定任务',
     recovery_suspend: '恢复运行',
     recovery_failed: '重跑失败任务',
     gantt: '甘特图',

+ 9 - 0
dolphinscheduler-ui/src/service/modules/executors/index.ts

@@ -18,6 +18,7 @@
 import { axios } from '@/service/service'
 import {
   ExecuteReq,
+  ExecuteTaskReq,
   ProjectCodeReq,
   ProcessDefinitionCodeReq,
   ProcessInstanceReq
@@ -31,6 +32,14 @@ export function execute(data: ExecuteReq, code: number): any {
   })
 }
 
+export function executeTask(data: ExecuteTaskReq, code: number): any {
+  return axios({
+    url: `/projects/${code}/executors/execute-task`,
+    method: 'post',
+    data
+  })
+}
+
 export function startCheckProcessDefinition(
   data: ProcessDefinitionCodeReq,
   code: ProjectCodeReq

+ 7 - 0
dolphinscheduler-ui/src/service/modules/executors/types.ts

@@ -41,6 +41,12 @@ interface ExecuteReq {
   processInstanceId: number
 }
 
+interface ExecuteTaskReq {
+  processInstanceId: number
+  startNodeList: number
+  taskDependType: string
+}
+
 interface ProjectCodeReq {
   projectCode: number
 }
@@ -69,6 +75,7 @@ interface ProcessInstanceReq extends ProcessDefinitionCodeReq {
 
 export {
   ExecuteReq,
+  ExecuteTaskReq,
   ProjectCodeReq,
   ProcessDefinitionCodeReq,
   ProcessInstanceReq

+ 53 - 8
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx

@@ -30,6 +30,10 @@ const props = {
     type: Boolean as PropType<boolean>,
     default: false
   },
+  executeTaskDisplay: {
+    type: Boolean as PropType<boolean>,
+    default: false
+  },
   menuDisplay: {
     type: Boolean as PropType<boolean>,
     default: false
@@ -59,7 +63,7 @@ const props = {
 export default defineComponent({
   name: 'dag-context-menu',
   props,
-  emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks'],
+  emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks', 'executeTask'],
   setup(props, ctx) {
     const graph = inject('graph', ref())
     const route = useRoute()
@@ -83,6 +87,22 @@ export default defineComponent({
       }
     }
 
+    const handleExecuteTaskOnly = () => {
+      ctx.emit('executeTask', Number(props.cell?.id), 'TASK_ONLY')
+    }
+
+    const handleExecuteTaskPOST = () => {
+      if (props.taskInstance) {
+        ctx.emit('executeTask', Number(props.cell?.id), 'TASK_POST')
+      }
+    }
+
+    const handleExecuteTaskPRE = () => {
+      if (props.taskInstance) {
+        ctx.emit('executeTask', Number(props.cell?.id), 'TASK_PRE')
+      }
+    }
+
     const handleCopy = () => {
       const genNums = 1
       const type = props.cell?.data.taskType
@@ -115,7 +135,10 @@ export default defineComponent({
       handleEdit,
       handleCopy,
       handleDelete,
-      handleViewLog
+      handleViewLog,
+      handleExecuteTaskOnly,
+      handleExecuteTaskPOST,
+      handleExecuteTaskPRE
     }
   },
   render() {
@@ -158,12 +181,34 @@ export default defineComponent({
             </>
           )}
           {this.taskInstance && (
-            <NButton
-              class={`${styles['menu-item']}`}
-              onClick={this.handleViewLog}
-            >
-              {t('project.node.view_log')}
-            </NButton>
+              <NButton
+                  class={`${styles['menu-item']}`}
+                  onClick={this.handleViewLog}
+              >
+                {t('project.node.view_log')}
+              </NButton>
+          )}
+          {this.executeTaskDisplay && (
+              <>
+                <NButton
+                    class={`${styles['menu-item']}`}
+                    onClick={this.handleExecuteTaskOnly}
+                >
+                  {t('project.workflow.current_node_execution_task')}
+                </NButton>
+                <NButton
+                    class={`${styles['menu-item']}`}
+                    onClick={this.handleExecuteTaskPOST}
+                >
+                  {t('project.workflow.backward_execution_task')}
+                </NButton>
+                <NButton
+                    class={`${styles['menu-item']}`}
+                    onClick={this.handleExecuteTaskPRE}
+                >
+                  {t('project.workflow.forward_execution_task')}
+                </NButton>
+              </>
           )}
         </div>
       )

+ 25 - 0
dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx

@@ -55,6 +55,7 @@ import { queryLog } from '@/service/modules/log'
 import { useAsyncState } from '@vueuse/core'
 import utils from '@/utils'
 import { useUISettingStore } from '@/store/ui-setting/ui-setting'
+import { executeTask } from '@/service/modules/executors'
 
 const props = {
   // If this prop is passed, it means from definition detail
@@ -135,6 +136,13 @@ export default defineComponent({
       }
     })
 
+    // execute task buttons in the dag node menu
+    const executeTaskDisplay = computed(() => {
+        return (
+            route.name === 'workflow-instance-detail'
+        )
+    })
+
     // other button in the dag node menu
     const menuDisplay = computed(() => {
       if (props.instance) {
@@ -283,6 +291,20 @@ export default defineComponent({
       getLogs(logTimer)
     }
 
+    const handleExecuteTask = (startNodeList: number, taskDependType: string) => {
+      executeTask({
+        processInstanceId: Number(route.params.id),
+        startNodeList: startNodeList,
+        taskDependType: taskDependType,
+      },
+        props.projectCode).then(() => {
+          window.$message.success(t('project.workflow.success'))
+          setTimeout(() => {
+            window.location.reload();
+          }, 1000);
+        })
+    }
+
     const downloadLogs = () => {
       utils.downloadFile('log/download-log', {
         taskInstanceId: nodeVariables.logTaskId
@@ -382,6 +404,7 @@ export default defineComponent({
         />
         <ContextMenuItem
           startDisplay={startDisplay.value}
+          executeTaskDisplay={executeTaskDisplay.value}
           menuDisplay={menuDisplay.value}
           taskInstance={taskInstance.value}
           cell={nodeVariables.menuCell as Cell}
@@ -394,6 +417,7 @@ export default defineComponent({
           onCopyTask={copyTask}
           onRemoveTasks={removeTasks}
           onViewLog={handleViewLog}
+          onExecuteTask={handleExecuteTask}
         />
         {!!props.definition && (
           <StartModal
@@ -418,3 +442,4 @@ export default defineComponent({
     )
   }
 })
+

+ 2 - 3
dolphinscheduler-ui/src/views/projects/workflow/instance/detail/index.tsx

@@ -16,7 +16,7 @@
  */
 
 import { defineComponent, onMounted, ref } from 'vue'
-import { useRoute, useRouter } from 'vue-router'
+import { useRoute } from 'vue-router'
 import { useThemeStore } from '@/store/theme/theme'
 import { useI18n } from 'vue-i18n'
 import Dag from '../../components/dag'
@@ -47,7 +47,6 @@ export default defineComponent({
   setup() {
     const theme = useThemeStore()
     const route = useRoute()
-    const router = useRouter()
     const { t } = useI18n()
     const projectCode = Number(route.params.projectCode)
     const id = Number(route.params.id)
@@ -101,7 +100,7 @@ export default defineComponent({
         projectCode
       ).then((ignored: any) => {
         window.$message.success(t('project.dag.success'))
-        router.push({ path: `/projects/${projectCode}/workflow/instances` })
+        window.location.reload()
       })
     }