Browse Source

[NEW FEATURE][FIX-4385] compensation task add the ability to configure parallelism (#5912)

* update

* web improved

* improve the ui

* add the ability to configure the parallelism

* update variables

* enhance the ut and add necessary note

* fix code style

* fix code style issue

* ensure the complation task in parallel mode can run the right numbers of tasks.
kyoty 3 years ago
parent
commit
1887bde1eb

+ 18 - 14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@@ -83,22 +83,24 @@ public class ExecutorController extends BaseController {
      * @param processInstancePriority process instance priority
      * @param workerGroup worker group
      * @param timeout timeout
+     * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
      * @return start process result code
      */
     @ApiOperation(value = "startProcessInstance", notes = "RUN_PROCESS_INSTANCE_NOTES")
     @ApiImplicitParams({
-            @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
-            @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
-            @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
-            @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
-            @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
-            @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
-            @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
-            @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
-            @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
-            @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
-            @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
-            @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
+        @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
+        @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
+        @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+        @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
+        @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+        @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
+        @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
+        @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+        @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
+        @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+        @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
+        @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
+        @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"),
     })
     @PostMapping(value = "start-process-instance")
     @ResponseStatus(HttpStatus.OK)
@@ -118,7 +120,9 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
                                        @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
                                        @RequestParam(value = "timeout", required = false) Integer timeout,
-                                       @RequestParam(value = "startParams", required = false) String startParams) {
+                                       @RequestParam(value = "startParams", required = false) String startParams,
+                                       @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber
+    ) {
 
         if (timeout == null) {
             timeout = Constants.MAX_TASK_TIMEOUT;
@@ -129,7 +133,7 @@ public class ExecutorController extends BaseController {
         }
         Map<String, Object> result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy,
                 startNodeList, taskDependType, warningType,
-                warningGroupId, runMode, processInstancePriority, workerGroup, timeout, startParamMap);
+                warningGroupId, runMode, processInstancePriority, workerGroup, timeout, startParamMap, expectedParallelismNumber);
         return returnDataList(result);
     }
 

+ 2 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@@ -52,6 +52,7 @@ public interface ExecutorService {
      * @param runMode run mode
      * @param timeout timeout
      * @param startParams the global param values which pass to new process instance
+     * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
      * @return execute process instance code
      */
     Map<String, Object> execProcessInstance(User loginUser, String projectName,
@@ -60,7 +61,7 @@ public interface ExecutorService {
                                             TaskDependType taskDependType, WarningType warningType, int warningGroupId,
                                             RunMode runMode,
                                             Priority processInstancePriority, String workerGroup, Integer timeout,
-                                            Map<String, String> startParams);
+                                            Map<String, String> startParams, Integer expectedParallelismNumber);
 
     /**
      * check whether the process definition can be executed

+ 23 - 13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -116,6 +116,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @param runMode run mode
      * @param timeout timeout
      * @param startParams the global param values which pass to new process instance
+     * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
      * @return execute process instance code
      */
     @Override
@@ -125,7 +126,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                                                    TaskDependType taskDependType, WarningType warningType, int warningGroupId,
                                                    RunMode runMode,
                                                    Priority processInstancePriority, String workerGroup, Integer timeout,
-                                                   Map<String, String> startParams) {
+                                                   Map<String, String> startParams, Integer expectedParallelismNumber) {
         Map<String, Object> result = new HashMap<>();
         // timeout is invalid
         if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@@ -162,7 +163,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
          */
         int create = this.createCommand(commandType, processDefinitionId,
                 taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
-                warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
+                warningGroupId, runMode, processInstancePriority, workerGroup, startParams, expectedParallelismNumber);
 
         if (create > 0) {
             processDefinition.setWarningGroupId(warningGroupId);
@@ -392,10 +393,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     /**
      * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution
      *
-     * @param loginUser           login user
-     * @param instanceId          instance id
+     * @param loginUser login user
+     * @param instanceId instance id
      * @param processDefinitionId process definition id
-     * @param commandType         command type
+     * @param commandType command type
      * @return insert result code
      */
     private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) {
@@ -489,7 +490,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                               String startNodeList, String schedule, WarningType warningType,
                               int executorId, int warningGroupId,
                               RunMode runMode, Priority processInstancePriority, String workerGroup,
-                              Map<String, String> startParams) {
+                              Map<String, String> startParams, Integer expectedParallelismNumber) {
 
         /**
          * instantiate command schedule instance
@@ -534,7 +535,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 end = DateUtils.getScheduleDate(interval[1]);
             }
         }
-
         // determine whether to complement
         if (commandType == CommandType.COMPLEMENT_DATA) {
             runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
@@ -546,21 +546,31 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     return processService.createCommand(command);
                 } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
                     List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
-                    List<Date> listDate = new LinkedList<>();
+                    LinkedList<Date> listDate = new LinkedList<>();
                     if (!CollectionUtils.isEmpty(schedules)) {
                         for (Schedule item : schedules) {
                             listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
                         }
                     }
                     if (!CollectionUtils.isEmpty(listDate)) {
-                        // loop by schedule date
-                        for (Date date : listDate) {
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
+                        int effectThreadsCount = expectedParallelismNumber == null ? listDate.size() : Math.min(listDate.size(), expectedParallelismNumber);
+                        logger.info("In parallel mode, current expectedParallelismNumber:{}", effectThreadsCount);
+
+                        int chunkSize = listDate.size() / effectThreadsCount;
+                        listDate.addFirst(start);
+                        listDate.addLast(end);
+
+                        for (int i = 0; i < effectThreadsCount; i++) {
+                            int rangeStart = i == 0 ? i : (i * chunkSize);
+                            int rangeEnd = i == effectThreadsCount - 1 ? listDate.size() - 1
+                                    : rangeStart + chunkSize + 1;
+                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart)));
+                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd)));
                             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                             processService.createCommand(command);
                         }
-                        return listDate.size();
+
+                        return effectThreadsCount;
                     } else {
                         // loop by day
                         int runCunt = 0;

+ 1 - 0
dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties

@@ -171,6 +171,7 @@ PROCESS_INSTANCE_START_TIME=process instance start time
 PROCESS_INSTANCE_END_TIME=process instance end time
 PROCESS_INSTANCE_SIZE=process instance size
 PROCESS_INSTANCE_PRIORITY=process instance priority
+EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task threads
 UPDATE_SCHEDULE_NOTES=update schedule 
 SCHEDULE_ID=schedule id
 ONLINE_SCHEDULE_NOTES=online schedule

+ 1 - 0
dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties

@@ -157,6 +157,7 @@ RECEIVERS=收件人
 RECEIVERS_CC=收件人(抄送)
 WORKER_GROUP_ID=Worker Server分组ID
 PROCESS_INSTANCE_PRIORITY=流程实例优先级
+EXPECTED_PARALLELISM_NUMBER=补数任务自定义并行度
 UPDATE_SCHEDULE_NOTES=更新定时
 SCHEDULE_ID=定时ID
 ONLINE_SCHEDULE_NOTES=定时上线

+ 9 - 9
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@@ -153,7 +153,7 @@ public class ExecutorService2Test {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
 
@@ -171,7 +171,7 @@ public class ExecutorService2Test {
                 null, "n1,n2",
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
 
@@ -190,7 +190,7 @@ public class ExecutorService2Test {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null);
         Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
         verify(processService, times(0)).createCommand(any(Command.class));
     }
@@ -207,7 +207,7 @@ public class ExecutorService2Test {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
 
@@ -225,7 +225,7 @@ public class ExecutorService2Test {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(31)).createCommand(any(Command.class));
 
@@ -243,9 +243,9 @@ public class ExecutorService2Test {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-        verify(processService, times(15)).createCommand(any(Command.class));
+        verify(processService, times(4)).createCommand(any(Command.class));
 
     }
 
@@ -258,7 +258,7 @@ public class ExecutorService2Test {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4);
         Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
 
     }
@@ -266,7 +266,7 @@ public class ExecutorService2Test {
     @Test
     public void testExecuteRepeatRunning() {
         Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
-        
+
         Map<String, Object> result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.REPEAT_RUNNING);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }

+ 66 - 2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue

@@ -116,12 +116,31 @@
           {{$t('Mode of execution')}}
         </div>
         <div class="cont">
-          <el-radio-group v-model="runMode" style="margin-top: 7px;">
+          <el-radio-group @change="_updateParallelStatus" style="margin-top: 7px;"
+                          v-model="runMode">
             <el-radio :label="'RUN_MODE_SERIAL'">{{$t('Serial execution')}}</el-radio>
             <el-radio :label="'RUN_MODE_PARALLEL'">{{$t('Parallel execution')}}</el-radio>
           </el-radio-group>
         </div>
       </div>
+      <div class="clearfix list" style="margin:-6px 0 16px 0" v-if="runMode === 'RUN_MODE_PARALLEL'">
+        <div class="text" style="padding-top: 6px;">
+          <em @click="_showParallelismInfo" class="ans el-icon-warning"></em>
+          {{$t('Parallelism')}}
+        </div>
+        <div class="cont" style="padding-top: 8px;">
+          <el-checkbox @change="_updateEnableCustomParallel" size="small"
+                       v-model="enableCustomParallelism">{{$t('Custom Parallelism')}}
+            <el-input :disabled="!enableCustomParallelism"
+                      :placeholder="$t('Please enter Parallelism')"
+                      ref="parallelismInput"
+                      size="mini"
+                      type="input"
+                      v-model="parallismNumber">
+            </el-input>
+          </el-checkbox>
+        </div>
+      </div>
       <div class="clearfix list">
         <div class="text">
           {{$t('Schedule date')}}
@@ -164,6 +183,7 @@
 </template>
 <script>
   import _ from 'lodash'
+  import i18n from '@/module/i18n'
   import dayjs from 'dayjs'
   import store from '@/conf/home/store'
   import { warningTypeList } from './util'
@@ -188,6 +208,8 @@
         scheduleTime: '',
         spinnerLoading: false,
         execType: false,
+        enableCustomParallelism: false,
+        parallismNumber: null,
         taskDependType: 'TASK_POST',
         runMode: 'RUN_MODE_SERIAL',
         processInstancePriority: 'MEDIUM',
@@ -208,13 +230,33 @@
     },
     methods: {
       ...mapMutations('dag', ['setIsDetails', 'resetParams']),
+      _showParallelismInfo () {
+        this.$message.warning({
+          dangerouslyUseHTMLString: true,
+          message: `<p style='font-size: 14px;'>${i18n.$t('Parallelism tip')}</p>`
+        })
+      },
       _onLocalParams (a) {
         this.udpList = a
       },
       _datepicker (val) {
         this.scheduleTime = val
       },
+      _verification () {
+        if (this.enableCustomParallelism && !this.parallismNumber) {
+          this.$message.warning(`${i18n.$t('Parallelism number should be positive integer')}`)
+          return false
+        }
+        if (this.parallismNumber && !(/(^[1-9]\d*$)/.test(this.parallismNumber))) {
+          this.$message.warning(`${i18n.$t('Parallelism number should be positive integer')}`)
+          return false
+        }
+        return true
+      },
       _start () {
+        if (!this._verification()) {
+          return
+        }
         this.spinnerLoading = true
         let startParams = {}
         for (const item of this.udpList) {
@@ -234,7 +276,8 @@
           runMode: this.runMode,
           processInstancePriority: this.processInstancePriority,
           workerGroup: this.workerGroup,
-          startParams: !_.isEmpty(startParams) ? JSON.stringify(startParams) : ''
+          startParams: !_.isEmpty(startParams) ? JSON.stringify(startParams) : '',
+          expectedParallelismNumber: this.parallismNumber
         }
         // Executed from the specified node
         if (this.sourceType === 'contextmenu') {
@@ -262,6 +305,19 @@
           })
         })
       },
+      _updateParallelStatus () {
+        this.enableCustomParallelism = false
+        this.parallismNumber = null
+      },
+      _updateEnableCustomParallel () {
+        if (!this.enableCustomParallelism) {
+          this.parallismNumber = null
+        } else {
+          this.$nextTick(() => {
+            this.$refs.parallelismInput.focus()
+          })
+        }
+      },
       _getGlobalParams () {
         this.store.dispatch('dag/getProcessDetails', this.startData.id).then(res => {
           this.definitionGlobalParams = _.cloneDeep(this.store.state.dag.globalParams)
@@ -325,6 +381,14 @@
         display: block;
       }
     }
+
+    .ans {
+      color: #0097e0;
+      font-size: 14px;
+      vertical-align: middle;
+      cursor: pointer;
+    }
+
     .list {
       margin-bottom: 14px;
       .text {

+ 4 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@@ -125,7 +125,11 @@ export default {
   'Slot Number': 'Slot Number',
   'Please enter Slot number': 'Please enter Slot number',
   Parallelism: 'Parallelism',
+  'Custom Parallelism': 'Configure parallelism',
   'Please enter Parallelism': 'Please enter Parallelism',
+  'Parallelism tip': 'If there are a large number of tasks requiring complement, you can use the custom parallelism to ' +
+  'set the complement task thread to a reasonable value to avoid too large impact on the server.',
+  'Parallelism number should be positive integer': 'Parallelism number should be positive integer',
   'TaskManager Number': 'TaskManager Number',
   'Please enter TaskManager number': 'Please enter TaskManager number',
   'App Name': 'App Name',

+ 3 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@@ -125,7 +125,10 @@ export default {
   'Slot Number': 'Slot数量',
   'Please enter Slot number': '请输入Slot数量',
   Parallelism: '并行度',
+  'Custom Parallelism': '自定义并行度',
   'Please enter Parallelism': '请输入并行度',
+  'Parallelism number should be positive integer': '并行度必须为正整数',
+  'Parallelism tip': '如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响',
   'TaskManager Number': 'TaskManager数量',
   'Please enter TaskManager number': '请输入TaskManager数量',
   'App Name': '任务名称',