Pārlūkot izejas kodu

[improve-#13053] in method of ``` execProcessInstance``` check ```startNodeList``` (#13057)

* [improve-#13053] in method of ``` execProcessInstance``` check ```startNodeList```

* [improve-#13053] format

* [improve-#13053] fix UT

* Update Status.java

update description

* [improve-#13053] add parameter ```version ```

* [improve-#13053] format

* [improve-#13053] version is not necessary

* [improve-#13053] format

* Update use-form.ts

change  ```version``` default vaule to ``` null```

* [improve-#13053] format

* Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>

* Update ExecutorServiceImpl.java

Co-authored-by: fuchanghai <‘2875334588@qq.com’>
Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
fuchanghai 2 gadi atpakaļ
vecāks
revīzija
7027d3d768

+ 4 - 3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@@ -147,7 +147,8 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
                                        @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
                                        @RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
-                                       @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
+                                       @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
+                                       @RequestParam(value = "version", required = false) Integer version) {
 
         if (timeout == null) {
             timeout = Constants.MAX_TASK_TIMEOUT;
@@ -165,7 +166,7 @@ public class ExecutorController extends BaseController {
                 scheduleTime, execType, failureStrategy,
                 startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
                 workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag,
-                complementDependentMode);
+                complementDependentMode, version);
         return returnDataList(result);
     }
 
@@ -264,7 +265,7 @@ public class ExecutorController extends BaseController {
                     execType, failureStrategy,
                     startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
                     workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag,
-                    complementDependentMode);
+                    complementDependentMode, null);
 
             if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
                 logger.error("Process definition start failed, projectCode:{}, processDefinitionCode:{}.", projectCode,

+ 3 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@@ -263,6 +263,9 @@ public enum Status {
     QUERY_TASK_INSTANCE_ERROR(10205, "query task instance error", "查询任务实例错误"),
     EXECUTE_NOT_DEFINE_TASK(10206, "please save and try again",
             "请先保存后再执行"),
+    START_NODE_NOT_EXIST_IN_LAST_PROCESS(10207, "this node {0} does not exist in the latest process definition",
+            "该节点 {0} 不存在于最新的流程定义中"),
+
     UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
     UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
     RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),

+ 2 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@@ -389,7 +389,8 @@ public class PythonGateway {
                 null,
                 DEFAULT_DRY_RUN,
                 DEFAULT_TEST_FLAG,
-                COMPLEMENT_DEPENDENT_MODE);
+                COMPLEMENT_DEPENDENT_MODE,
+                processDefinition.getVersion());
     }
 
     // side object

+ 1 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@@ -69,7 +69,7 @@ public interface ExecutorService {
                                             Integer timeout,
                                             Map<String, String> startParams, Integer expectedParallelismNumber,
                                             int dryRun, int testFlag,
-                                            ComplementDependentMode complementDependentMode);
+                                            ComplementDependentMode complementDependentMode, Integer version);
 
     /**
      * check whether the process definition can be executed

+ 22 - 4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -195,7 +195,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                                                    Long environmentCode, Integer timeout,
                                                    Map<String, String> startParams, Integer expectedParallelismNumber,
                                                    int dryRun, int testFlag,
-                                                   ComplementDependentMode complementDependentMode) {
+                                                   ComplementDependentMode complementDependentMode, Integer version) {
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
         Map<String, Object> result =
@@ -209,12 +209,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
             return result;
         }
-
+        ProcessDefinition processDefinition;
+        if (null != version) {
+            processDefinition = processService.findProcessDefinition(processDefinitionCode, version);
+        } else {
+            processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        }
         // check process define release state
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
         this.checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode,
                 processDefinition.getVersion());
-
+        // check current version whether include startNodeList
+        checkStartNodeList(startNodeList, processDefinitionCode, processDefinition.getVersion());
         if (!checkTenantSuitable(processDefinition)) {
             logger.error(
                     "There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
@@ -634,6 +639,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         return tenant != null;
     }
 
+    public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
+        if (StringUtils.isNotEmpty(startNodeList)) {
+            List<ProcessTaskRelation> processTaskRelations =
+                    processService.findRelationByCode(processDefinitionCode, version);
+            List<Long> existsNodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode)
+                    .collect(Collectors.toList());
+            for (String startNode : startNodeList.split(Constants.COMMA)) {
+                if (!existsNodes.contains(Long.valueOf(startNode))) {
+                    throw new ServiceException(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS, startNode);
+                }
+            }
+        }
+    }
     /**
      * Check the state of process instance and the type of operation match
      *

+ 5 - 4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java

@@ -77,6 +77,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
     final int dryRun = 7;
     final int testFlag = 0;
     final ComplementDependentMode complementDependentMode = ComplementDependentMode.OFF_MODE;
+    final Integer version = null;
 
     final JsonObject expectResponseContent = gson
             .fromJson("{\"code\":0,\"msg\":\"success\",\"data\":\"Test Data\",\"success\":true,\"failed\":false}",
@@ -115,7 +116,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
                 eq(warningType),
                 eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
                 eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
-                eq(complementDependentMode)))
+                eq(complementDependentMode), eq(version)))
                         .thenReturn(executeServiceResult);
 
         // When
@@ -159,7 +160,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
                 eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
                 eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun),
                 eq(testFlag),
-                eq(complementDependentMode))).thenReturn(executeServiceResult);
+                eq(complementDependentMode), eq(version))).thenReturn(executeServiceResult);
 
         // When
         final MvcResult mvcResult = mockMvc
@@ -201,7 +202,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
                 eq(warningType),
                 eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
                 eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
-                eq(complementDependentMode))).thenReturn(executeServiceResult);
+                eq(complementDependentMode), eq(version))).thenReturn(executeServiceResult);
 
         // When
         final MvcResult mvcResult = mockMvc
@@ -230,7 +231,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
                 eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
                 eq(null), eq(null), eq(null), eq("default"), eq(-1L),
                 eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0),
-                eq(complementDependentMode))).thenReturn(executeServiceResult);
+                eq(complementDependentMode), eq(version))).thenReturn(executeServiceResult);
 
         // When
         final MvcResult mvcResult = mockMvc

+ 48 - 11
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@@ -30,6 +30,7 @@ import static org.mockito.Mockito.when;
 import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
 import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
@@ -50,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
@@ -143,6 +145,8 @@ public class ExecutorServiceTest {
 
     private int processDefinitionId = 1;
 
+    private int processDefinitionVersion = 1;
+
     private long processDefinitionCode = 1L;
 
     private int processInstanceId = 1;
@@ -159,6 +163,8 @@ public class ExecutorServiceTest {
 
     private TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
 
+    private List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
+
     private User loginUser = new User();
 
     private long projectCode = 1L;
@@ -203,20 +209,30 @@ public class ExecutorServiceTest {
         // cronRangeTime
         cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00";
 
+        // processTaskRelations
+        ProcessTaskRelation processTaskRelation1 = new ProcessTaskRelation();
+        processTaskRelation1.setPostTaskCode(123456789L);
+        ProcessTaskRelation processTaskRelation2 = new ProcessTaskRelation();
+        processTaskRelation2.setPostTaskCode(987654321L);
+        processTaskRelations.add(processTaskRelation1);
+        processTaskRelations.add(processTaskRelation2);
+
         // mock
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_START))
                 .thenReturn(checkProjectAndAuth());
-        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(this.processDefinition);
         Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
         doReturn(1).when(commandService).createCommand(argThat(c -> c.getId() == null));
         doReturn(0).when(commandService).createCommand(argThat(c -> c.getId() != null));
         Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
         Mockito.when(processService.findProcessInstanceDetailById(processInstanceId))
                 .thenReturn(Optional.ofNullable(processInstance));
-        Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
+        Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(this.processDefinition);
         Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue);
         Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance);
+        Mockito.when(processService.findRelationByCode(processDefinitionCode, processDefinitionVersion))
+                .thenReturn(processTaskRelations);
     }
 
     @Test
@@ -243,7 +259,7 @@ public class ExecutorServiceTest {
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(1)).createCommand(any(Command.class));
 
@@ -261,17 +277,38 @@ public class ExecutorServiceTest {
                 processDefinitionCode,
                 "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.START_PROCESS,
-                null, "n1,n2",
+                null, "123456789,987654321",
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(1)).createCommand(any(Command.class));
 
     }
 
+    @Test
+    public void testComplementWithOldStartNodeList() {
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode))
+                .thenReturn(zeroSchedulerList());
+        Map<String, Object> result = new HashMap<>();
+        try {
+            result = executorService.execProcessInstance(loginUser, projectCode,
+                    processDefinitionCode,
+                    "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
+                    CommandType.START_PROCESS,
+                    null, "1123456789,987654321",
+                    null, null, null,
+                    RunMode.RUN_MODE_SERIAL,
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+                    Constants.TEST_FLAG_NO,
+                    ComplementDependentMode.OFF_MODE, null);
+        } catch (ServiceException e) {
+            Assertions.assertEquals(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS.getCode(), e.getCode());
+        }
+    }
+
     @Test
     public void testComplementWithDependentMode() {
         Schedule schedule = new Schedule();
@@ -333,7 +370,7 @@ public class ExecutorServiceTest {
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
         verify(commandService, times(0)).createCommand(any(Command.class));
     }
@@ -355,7 +392,7 @@ public class ExecutorServiceTest {
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(1)).createCommand(any(Command.class));
     }
@@ -377,7 +414,7 @@ public class ExecutorServiceTest {
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(31)).createCommand(any(Command.class));
 
@@ -400,7 +437,7 @@ public class ExecutorServiceTest {
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(15)).createCommand(any(Command.class));
 
@@ -419,7 +456,7 @@ public class ExecutorServiceTest {
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
 
     }
@@ -448,7 +485,7 @@ public class ExecutorServiceTest {
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_YES,
-                ComplementDependentMode.OFF_MODE);
+                ComplementDependentMode.OFF_MODE, null);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 

+ 1 - 0
dolphinscheduler-ui/src/service/modules/executors/types.ts

@@ -71,6 +71,7 @@ interface ProcessInstanceReq extends ProcessDefinitionCodeReq {
   taskDependType?: 'TASK_ONLY' | 'TASK_PRE' | 'TASK_POST'
   timeout?: number
   workerGroup?: string
+  version?: number
 }
 
 export {

+ 1 - 1
dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx

@@ -92,7 +92,7 @@ export default defineComponent({
     }
 
     const handleStart = () => {
-      handleStartDefinition(props.row.code)
+      handleStartDefinition(props.row.code,props.row.version)
     }
 
     const generalWarningTypeListOptions = () => [

+ 2 - 1
dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts

@@ -67,7 +67,8 @@ export const useForm = () => {
       startParams: null,
       expectedParallelismNumber: '',
       dryRun: 0,
-      testFlag: 0
+      testFlag: 0,
+      version: null
     },
     saving: false,
     rules: {

+ 2 - 1
dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-modal.ts

@@ -85,13 +85,14 @@ export function useModal(
     }
   }
 
-  const handleStartDefinition = async (code: number) => {
+  const handleStartDefinition = async (code: number,version: number) => {
     await state.startFormRef.validate()
 
     if (state.saving) return
     state.saving = true
     try {
       state.startForm.processDefinitionCode = code
+      state.startForm.version = version
       const params = omit(state.startForm, [
         'startEndTime',
         'scheduleTime',