Browse Source

[Feature-14321][API] Support to complement data in descending or ascending order of date (#14450)

calvin 1 year ago
parent
commit
76b1eefb68

+ 13 - 6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -101,6 +102,7 @@ public class ExecutorController extends BaseController {
      * @param timeout timeout
      * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
      * @param testFlag testFlag
+     * @param executionOrder complement data in some kind of order
      * @return start process result code
      */
     @Operation(summary = "startProcessInstance", description = "RUN_PROCESS_INSTANCE_NOTES")
@@ -123,7 +125,8 @@ public class ExecutorController extends BaseController {
             @Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")),
             @Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")),
             @Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)),
-            @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false"))
+            @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false")),
+            @Parameter(name = "executionOrder", description = "EXECUTION_ORDER", schema = @Schema(implementation = ExecutionOrder.class))
     })
     @PostMapping(value = "start-process-instance")
     @ResponseStatus(HttpStatus.OK)
@@ -151,7 +154,8 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
                                        @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
                                        @RequestParam(value = "version", required = false) Integer version,
-                                       @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent) {
+                                       @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
+                                       @RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {
 
         if (timeout == null) {
             timeout = Constants.MAX_TASK_TIMEOUT;
@@ -170,7 +174,7 @@ public class ExecutorController extends BaseController {
                 startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
                 workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
                 testFlag,
-                complementDependentMode, version, allLevelDependent);
+                complementDependentMode, version, allLevelDependent, executionOrder);
         return returnDataList(result);
     }
 
@@ -196,6 +200,7 @@ public class ExecutorController extends BaseController {
      * @param timeout timeout
      * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
      * @param testFlag testFlag
+     * @param executionOrder complement data in some kind of order
      * @return start process result code
      */
     @Operation(summary = "batchStartProcessInstance", description = "BATCH_RUN_PROCESS_INSTANCE_NOTES")
@@ -218,7 +223,8 @@ public class ExecutorController extends BaseController {
             @Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")),
             @Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")),
             @Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)),
-            @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false"))
+            @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false")),
+            @Parameter(name = "executionOrder", description = "EXECUTION_ORDER", schema = @Schema(implementation = ExecutionOrder.class))
     })
     @PostMapping(value = "batch-start-process-instance")
     @ResponseStatus(HttpStatus.OK)
@@ -245,7 +251,8 @@ public class ExecutorController extends BaseController {
                                             @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
                                             @RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
                                             @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
-                                            @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent) {
+                                            @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
+                                            @RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {
 
         if (timeout == null) {
             log.debug("Parameter timeout set to {} due to null.", Constants.MAX_TASK_TIMEOUT);
@@ -275,7 +282,7 @@ public class ExecutorController extends BaseController {
                     startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
                     workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
                     testFlag,
-                    complementDependentMode, null, allLevelDependent);
+                    complementDependentMode, null, allLevelDependent, executionOrder);
 
             if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
                 log.error("Process definition start failed, projectCode:{}, processDefinitionCode:{}.", projectCode,

+ 3 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@@ -413,6 +413,9 @@ public enum Status {
     WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation",
             "工作流实例未结束,不能执行此操作"),
 
+    TASK_PARALLELISM_PARAMS_ERROR(50080, "task parallelism parameter is not valid", "任务并行度参数无效"),
+    TASK_COMPLEMENT_DATA_DATE_ERROR(50081, "The range of date for complementing date is not valid", "补数选择的日期范围无效"),
+
     HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
     STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
     S3_CANNOT_RENAME(60003, "directory cannot be renamed", "S3无法重命名文件夹"),

+ 4 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.service.UsersService;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
@@ -94,6 +95,7 @@ public class PythonGateway {
 
     private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST;
     private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
+    private static final ExecutionOrder DEFAULT_EXECUTION_ORDER = ExecutionOrder.DESC_ORDER;
     private static final int DEFAULT_DRY_RUN = 0;
     private static final int DEFAULT_TEST_FLAG = 0;
     private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE = ComplementDependentMode.OFF_MODE;
@@ -405,7 +407,8 @@ public class PythonGateway {
                 DEFAULT_TEST_FLAG,
                 COMPLEMENT_DEPENDENT_MODE,
                 processDefinition.getVersion(),
-                false);
+                false,
+                DEFAULT_EXECUTION_ORDER);
     }
 
     // side object

+ 3 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -58,6 +59,7 @@ public interface ExecutorService {
      * @param timeout timeout
      * @param startParams the global param values which pass to new process instance
      * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
+     * @param executionOrder the execution order when complementing data
      * @return execute process instance code
      */
     Map<String, Object> execProcessInstance(User loginUser, long projectCode,
@@ -72,7 +74,7 @@ public interface ExecutorService {
                                             Map<String, String> startParams, Integer expectedParallelismNumber,
                                             int dryRun, int testFlag,
                                             ComplementDependentMode complementDependentMode, Integer version,
-                                            boolean allLevelDependent);
+                                            boolean allLevelDependent, ExecutionOrder executionOrder);
 
     /**
      * check whether the process definition can be executed

+ 118 - 141
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -48,6 +48,7 @@ import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
 import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
@@ -104,10 +105,12 @@ import org.apache.commons.lang3.StringUtils;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -118,7 +121,7 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import com.google.common.collect.Lists;
+import com.google.common.base.Splitter;
 
 /**
  * executor service impl
@@ -207,6 +210,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @param startParams               the global param values which pass to new process instance
      * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
      * @param testFlag testFlag
+     * @param executionOrder the execution order when complementing data
      * @return execute process instance code
      */
     @Override
@@ -222,7 +226,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                                                    Map<String, String> startParams, Integer expectedParallelismNumber,
                                                    int dryRun, int testFlag,
                                                    ComplementDependentMode complementDependentMode, Integer version,
-                                                   boolean allLevelDependent) {
+                                                   boolean allLevelDependent, ExecutionOrder executionOrder) {
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
         Map<String, Object> result =
@@ -236,6 +240,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
             return result;
         }
+
+        if (Objects.nonNull(expectedParallelismNumber) && expectedParallelismNumber <= 0) {
+            log.warn("Parameter expectedParallelismNumber is invalid, expectedParallelismNumber:{}.",
+                    expectedParallelismNumber);
+            putMsg(result, Status.TASK_PARALLELISM_PARAMS_ERROR);
+            return result;
+        }
+
         checkValidTenant(tenantCode);
         ProcessDefinition processDefinition;
         if (null != version) {
@@ -264,7 +276,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                         cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
                         workerGroup, tenantCode,
                         environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
-                        complementDependentMode, allLevelDependent);
+                        complementDependentMode, allLevelDependent, executionOrder);
 
         if (create > 0) {
             processDefinition.setWarningGroupId(warningGroupId);
@@ -738,6 +750,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @param testFlag                testFlag
      * @param environmentCode         environmentCode
      * @param allLevelDependent       allLevelDependent
+     * @param executionOrder          executionOrder
      * @return command id
      */
     private int createCommand(Long triggerCode, CommandType commandType, long processDefineCode, TaskDependType nodeDep,
@@ -747,7 +760,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                               Long environmentCode,
                               Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun,
                               int testFlag, ComplementDependentMode complementDependentMode,
-                              boolean allLevelDependent) {
+                              boolean allLevelDependent, ExecutionOrder executionOrder) {
 
         /**
          * instantiate command schedule instance
@@ -806,7 +819,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 log.info("Start to create {} command, processDefinitionCode:{}.",
                         command.getCommandType().getDescp(), processDefineCode);
                 return createComplementCommandList(triggerCode, schedule, runMode, command, expectedParallelismNumber,
-                        complementDependentMode, allLevelDependent);
+                        complementDependentMode, allLevelDependent, executionOrder);
             } catch (CronParseException cronParseException) {
                 // We catch the exception here just to make compiler happy, since we have already validated the schedule
                 // cron expression before
@@ -822,174 +835,138 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         }
     }
 
+    private int createComplementCommand(Long triggerCode, Command command, Map<String, String> cmdParam,
+                                        List<ZonedDateTime> dateTimeList, List<Schedule> schedules,
+                                        ComplementDependentMode complementDependentMode, boolean allLevelDependent) {
+
+        String dateTimeListStr = dateTimeList.stream()
+                .map(item -> DateUtils.dateToString(item))
+                .collect(Collectors.joining(COMMA));
+
+        cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateTimeListStr);
+        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+
+        log.info("Creating command, commandInfo:{}.", command);
+        int createCount = commandService.createCommand(command);
+
+        if (createCount > 0) {
+            log.info("Create {} command complete, processDefinitionCode:{}",
+                    command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+        } else {
+            log.error("Create {} command error, processDefinitionCode:{}",
+                    command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+        }
+
+        if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
+            log.info(
+                    "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
+                    command.getProcessDefinitionCode());
+        } else {
+            log.info(
+                    "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
+                    command.getProcessDefinitionCode());
+            createComplementDependentCommand(schedules, command, allLevelDependent);
+        }
+
+        if (createCount > 0) {
+            triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
+        }
+        return createCount;
+    }
+
     /**
      * create complement command
      * close left and close right
      *
      * @param scheduleTimeParam
      * @param runMode
+     * @param executionOrder
      * @return
      */
     protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode,
                                               Command command,
                                               Integer expectedParallelismNumber,
                                               ComplementDependentMode complementDependentMode,
-                                              boolean allLevelDependent) throws CronParseException {
+                                              boolean allLevelDependent,
+                                              ExecutionOrder executionOrder) throws CronParseException {
         int createCount = 0;
-        String startDate = null;
-        String endDate = null;
-        String dateList = null;
         int dependentProcessDefinitionCreateCount = 0;
         runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
         Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
-        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
-            dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
-            dateList = removeDuplicates(dateList);
+
+        if (Objects.isNull(executionOrder)) {
+            executionOrder = ExecutionOrder.DESC_ORDER;
         }
+
+        List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
+                command.getProcessDefinitionCode());
+
+        List<ZonedDateTime> listDate = new ArrayList<>();
         if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(
                 CMD_PARAM_COMPLEMENT_DATA_END_DATE)) {
-            startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
-            endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
+            String startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
+            String endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
+            if (startDate != null && endDate != null) {
+                listDate = CronUtils.getSelfFireDateList(
+                        DateUtils.stringToZoneDateTime(startDate),
+                        DateUtils.stringToZoneDateTime(endDate),
+                        schedules);
+            }
         }
+
+        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+            String dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+
+            if (StringUtils.isNotBlank(dateList)) {
+                listDate = Splitter.on(COMMA).splitToStream(dateList)
+                        .map(item -> DateUtils.stringToZoneDateTime(item.trim()))
+                        .distinct()
+                        .collect(Collectors.toList());
+            }
+        }
+
+        if (CollectionUtils.isEmpty(listDate)) {
+            throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR);
+        }
+
+        if (executionOrder.equals(ExecutionOrder.DESC_ORDER)) {
+            Collections.sort(listDate, Collections.reverseOrder());
+        } else {
+            Collections.sort(listDate);
+        }
+
         switch (runMode) {
             case RUN_MODE_SERIAL: {
                 log.info("RunMode of {} command is serial run, processDefinitionCode:{}.",
                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                if (StringUtils.isNotEmpty(dateList)) {
-                    cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
-                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                    log.info("Creating command, commandInfo:{}.", command);
-                    createCount = commandService.createCommand(command);
-                    if (createCount > 0) {
-                        log.info("Create {} command complete, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    } else {
-                        log.error("Create {} command error, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    }
-                }
-                if (startDate != null && endDate != null) {
-                    cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startDate);
-                    cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endDate);
-                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                    log.info("Creating command, commandInfo:{}.", command);
-                    createCount = commandService.createCommand(command);
-                    if (createCount > 0) {
-                        log.info("Create {} command complete, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    } else {
-                        log.error("Create {} command error, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    }
-                    // dependent process definition
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
-                            command.getProcessDefinitionCode());
-
-                    if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                        log.info(
-                                "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
-                                command.getProcessDefinitionCode());
-                    } else {
-                        log.info(
-                                "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
-                                command.getProcessDefinitionCode());
-                        dependentProcessDefinitionCreateCount +=
-                                createComplementDependentCommand(schedules, command, allLevelDependent);
-                    }
-                }
-                if (createCount > 0) {
-                    triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
-                }
+                createCount = createComplementCommand(triggerCode, command, cmdParam, listDate, schedules,
+                        complementDependentMode, allLevelDependent);
                 break;
             }
             case RUN_MODE_PARALLEL: {
                 log.info("RunMode of {} command is parallel run, processDefinitionCode:{}.",
                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                if (startDate != null && endDate != null) {
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
-                            command.getProcessDefinitionCode());
-                    List<ZonedDateTime> listDate = CronUtils.getSelfFireDateList(
-                            DateUtils.stringToZoneDateTime(startDate),
-                            DateUtils.stringToZoneDateTime(endDate),
-                            schedules);
-                    int listDateSize = listDate.size();
-                    createCount = listDate.size();
-                    if (!CollectionUtils.isEmpty(listDate)) {
-                        if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
-                            createCount = Math.min(createCount, expectedParallelismNumber);
-                        }
-                        log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
-                                createCount);
-
-                        // Distribute the number of tasks equally to each command.
-                        // The last command with insufficient quantity will be assigned to the remaining tasks.
-                        int itemsPerCommand = (listDateSize / createCount);
-                        int remainingItems = (listDateSize % createCount);
-                        int startDateIndex = 0;
-                        int endDateIndex = 0;
-
-                        for (int i = 1; i <= createCount; i++) {
-                            int extra = (i <= remainingItems) ? 1 : 0;
-                            int singleCommandItems = (itemsPerCommand + extra);
-
-                            if (i == 1) {
-                                endDateIndex += singleCommandItems - 1;
-                            } else {
-                                startDateIndex = endDateIndex + 1;
-                                endDateIndex += singleCommandItems;
-                            }
-
-                            cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE,
-                                    DateUtils.dateToString(listDate.get(startDateIndex)));
-                            cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE,
-                                    DateUtils.dateToString(listDate.get(endDateIndex)));
-                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            log.info("Creating command, commandInfo:{}.", command);
-                            if (commandService.createCommand(command) > 0) {
-                                log.info("Create {} command complete, processDefinitionCode:{}",
-                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                                triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode,
-                                        command.getId());
-                            } else {
-                                log.error("Create {} command error, processDefinitionCode:{}",
-                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                            }
-                            if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                                log.info(
-                                        "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
-                                        command.getProcessDefinitionCode());
-                            } else {
-                                log.info(
-                                        "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
-                                        command.getProcessDefinitionCode());
-                                dependentProcessDefinitionCreateCount +=
-                                        createComplementDependentCommand(schedules, command, allLevelDependent);
-                            }
-                        }
+
+                int queueNum = 0;
+                if (CollectionUtils.isNotEmpty(listDate)) {
+                    queueNum = listDate.size();
+                    if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
+                        queueNum = Math.min(queueNum, expectedParallelismNumber);
                     }
-                }
-                if (StringUtils.isNotEmpty(dateList)) {
-                    List<String> listDate = Arrays.asList(dateList.split(COMMA));
-                    createCount = listDate.size();
-                    if (!CollectionUtils.isEmpty(listDate)) {
-                        if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
-                            createCount = Math.min(createCount, expectedParallelismNumber);
-                        }
-                        log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
-                                createCount);
-                        for (List<String> stringDate : Lists.partition(listDate, createCount)) {
-                            cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
-                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            log.info("Creating command, commandInfo:{}.", command);
-                            if (commandService.createCommand(command) > 0) {
-                                log.info("Create {} command complete, processDefinitionCode:{}",
-                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                            } else {
-                                log.error("Create {} command error, processDefinitionCode:{}",
-                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                            }
+                    log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
+                            queueNum);
+                    List[] queues = new List[queueNum];
+
+                    for (int i = 0; i < listDate.size(); i++) {
+                        if (Objects.isNull(queues[i % queueNum])) {
+                            queues[i % queueNum] = new ArrayList();
                         }
+                        queues[i % queueNum].add(listDate.get(i));
+                    }
+                    for (List queue : queues) {
+                        createCount = createComplementCommand(triggerCode, command, cmdParam, queue, schedules,
+                                complementDependentMode, allLevelDependent);
                     }
                 }
                 break;

+ 12 - 4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java

@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -68,6 +69,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
     final WarningType warningType = WarningType.NONE;
     final int warningGroupId = 3;
     final RunMode runMode = RunMode.RUN_MODE_SERIAL;
+    final ExecutionOrder executionOrder = ExecutionOrder.DESC_ORDER;
     final Priority processInstancePriority = Priority.HIGH;
     final String workerGroup = "workerGroup";
     final String tenantCode = "root";
@@ -112,6 +114,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
         paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber));
         paramsMap.add("dryRun", String.valueOf(dryRun));
         paramsMap.add("testFlag", String.valueOf(testFlag));
+        paramsMap.add("executionOrder", String.valueOf(executionOrder));
 
         when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
                 eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType),
@@ -120,7 +123,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
                 eq(environmentCode),
                 eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
                 eq(complementDependentMode), eq(version),
-                eq(allLevelDependent)))
+                eq(allLevelDependent), eq(executionOrder)))
                         .thenReturn(executeServiceResult);
 
         // When
@@ -158,6 +161,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
         paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber));
         paramsMap.add("dryRun", String.valueOf(dryRun));
         paramsMap.add("testFlag", String.valueOf(testFlag));
+        paramsMap.add("executionOrder", String.valueOf(executionOrder));
 
         when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
                 eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType),
@@ -166,7 +170,8 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
                 eq(environmentCode),
                 eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun),
                 eq(testFlag),
-                eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult);
+                eq(complementDependentMode), eq(version), eq(allLevelDependent), eq(executionOrder)))
+                        .thenReturn(executeServiceResult);
 
         // When
         final MvcResult mvcResult = mockMvc
@@ -203,6 +208,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
         paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber));
         paramsMap.add("dryRun", String.valueOf(dryRun));
         paramsMap.add("testFlag", String.valueOf(testFlag));
+        paramsMap.add("executionOrder", String.valueOf(executionOrder));
 
         when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
                 eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType),
@@ -210,7 +216,8 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
                 eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(tenantCode),
                 eq(environmentCode),
                 eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
-                eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult);
+                eq(complementDependentMode), eq(version), eq(allLevelDependent), eq(executionOrder)))
+                        .thenReturn(executeServiceResult);
 
         // When
         final MvcResult mvcResult = mockMvc
@@ -239,7 +246,8 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
                 eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
                 eq(null), eq(null), eq(null), eq("default"), eq("default"), eq(-1L),
                 eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0),
-                eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult);
+                eq(complementDependentMode), eq(version), eq(allLevelDependent), eq(null)))
+                        .thenReturn(executeServiceResult);
 
         // When
         final MvcResult mvcResult = mockMvc

+ 30 - 17
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java

@@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
@@ -280,10 +281,12 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, null,
+                Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
-                false);
+                false,
+                ExecutionOrder.DESC_ORDER);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(1)).createCommand(any(Command.class));
 
@@ -305,10 +308,12 @@ public class ExecuteFunctionServiceTest {
                 null, "123456789,987654321",
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null,
+                Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
-                false);
+                false,
+                ExecutionOrder.DESC_ORDER);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(1)).createCommand(any(Command.class));
 
@@ -332,7 +337,8 @@ public class ExecuteFunctionServiceTest {
                     Constants.DRY_RUN_FLAG_NO,
                     Constants.TEST_FLAG_NO,
                     ComplementDependentMode.OFF_MODE, null,
-                    false);
+                    false,
+                    ExecutionOrder.DESC_ORDER);
         } catch (ServiceException e) {
             Assertions.assertEquals(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS.getCode(), e.getCode());
         }
@@ -413,10 +419,11 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
-                false);
+                false,
+                ExecutionOrder.DESC_ORDER);
         Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
         verify(commandService, times(0)).createCommand(any(Command.class));
     }
@@ -437,10 +444,12 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null,
+                Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
-                false);
+                false,
+                ExecutionOrder.DESC_ORDER);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(1)).createCommand(any(Command.class));
     }
@@ -461,13 +470,14 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
-                false);
-        Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-        verify(commandService, times(31)).createCommand(any(Command.class));
+                false,
+                ExecutionOrder.DESC_ORDER);
 
+        Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+        verify(commandService, times(2)).createCommand(any(Command.class));
     }
 
     /**
@@ -490,7 +500,8 @@ public class ExecuteFunctionServiceTest {
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
-                false);
+                false,
+                ExecutionOrder.DESC_ORDER);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(commandService, times(15)).createCommand(any(Command.class));
 
@@ -518,11 +529,12 @@ public class ExecuteFunctionServiceTest {
                 100L,
                 110,
                 null,
-                0,
+                null,
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
-                false));
+                false,
+                ExecutionOrder.DESC_ORDER));
     }
 
     @Test
@@ -555,7 +567,8 @@ public class ExecuteFunctionServiceTest {
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_YES,
                 ComplementDependentMode.OFF_MODE, null,
-                false);
+                false,
+                ExecutionOrder.DESC_ORDER);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 

+ 50 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java

@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+/**
+ * complement data in some kind of order
+ */
+public enum ExecutionOrder {
+
+    /**
+     * 0 complement data in descending order
+     * 1 complement data in ascending order
+     */
+    DESC_ORDER(0, "descending order"),
+    ASC_ORDER(1, "ascending order");
+
+    ExecutionOrder(int code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    @EnumValue
+    private final int code;
+    private final String desc;
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+}

+ 3 - 0
dolphinscheduler-ui/src/locales/en_US/project.ts

@@ -140,6 +140,9 @@ export default {
     parallelism: 'Parallelism',
     custom_parallelism: 'Custom Parallelism',
     please_enter_parallelism: 'Please enter Parallelism',
+    order_of_execution: 'Order of execution',
+    ascending_order: 'In ascending order',
+    descending_order: 'In descending order',
     please_choose: 'Please Choose',
     start_time: 'Start Time',
     end_time: 'End Time',

+ 3 - 0
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@@ -141,6 +141,9 @@ export default {
     parallelism: '并行度',
     custom_parallelism: '自定义并行度',
     please_enter_parallelism: '请输入并行度',
+    order_of_execution: '执行顺序',
+    ascending_order: '按日期升序执行',
+    descending_order: '按日期降序执行',
     please_choose: '请选择',
     start_time: '开始时间',
     end_time: '结束时间',

+ 15 - 0
dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx

@@ -432,6 +432,21 @@ export default defineComponent({
                     />
                   </NFormItem>
                 )}
+                <NFormItem
+                    label={t('project.workflow.order_of_execution')}
+                    path='executionOrder'
+                >
+                  <NRadioGroup v-model:value={this.startForm.executionOrder}>
+                    <NSpace>
+                      <NRadio value={'DESC_ORDER'}>
+                        {t('project.workflow.descending_order')}
+                      </NRadio>
+                      <NRadio value={'ASC_ORDER'}>
+                        {t('project.workflow.ascending_order')}
+                      </NRadio>
+                    </NSpace>
+                  </NRadioGroup>
+                </NFormItem>
                 <NFormItem
                   label={t('project.workflow.schedule_date')}
                   path={

+ 2 - 1
dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts

@@ -70,7 +70,8 @@ export const useForm = () => {
       dryRun: 0,
       testFlag: 0,
       version: null,
-      allLevelDependent: 'false'
+      allLevelDependent: 'false',
+      executionOrder: 'DESC_ORDER',
     },
     saving: false,
     rules: {