Bläddra i källkod

[Fix-6347][Master-API]fix bug #6347 the first schedule_time is error in complement data (#6389)

* fix bug #6347  complement data errors
OS 3 år sedan
förälder
incheckning
c013b49e72

+ 63 - 51
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -551,66 +551,78 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             if (interval.length == 2) {
                 start = DateUtils.getScheduleDate(interval[0]);
                 end = DateUtils.getScheduleDate(interval[1]);
+                if (start.after(end)) {
+                    logger.info("complement data error, wrong date start:{} and end date:{} ",
+                            start, end
+                    );
+                    return 0;
+                }
             }
         }
         // determine whether to complement
         if (commandType == CommandType.COMPLEMENT_DATA) {
-            runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
-            if (null != start && null != end && !start.after(end)) {
-                if (runMode == RunMode.RUN_MODE_SERIAL) {
-                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
-                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
-                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                    return processService.createCommand(command);
-                } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineCode);
-                    LinkedList<Date> listDate = new LinkedList<>();
-                    if (!CollectionUtils.isEmpty(schedules)) {
-                        for (Schedule item : schedules) {
-                            listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
-                        }
-                    }
-                    if (!CollectionUtils.isEmpty(listDate)) {
-                        int effectThreadsCount = expectedParallelismNumber == null ? listDate.size() : Math.min(listDate.size(), expectedParallelismNumber);
-                        logger.info("In parallel mode, current expectedParallelismNumber:{}", effectThreadsCount);
-
-                        int chunkSize = listDate.size() / effectThreadsCount;
-                        listDate.addFirst(start);
-                        listDate.addLast(end);
-
-                        for (int i = 0; i < effectThreadsCount; i++) {
-                            int rangeStart = i == 0 ? i : (i * chunkSize);
-                            int rangeEnd = i == effectThreadsCount - 1 ? listDate.size() - 1
-                                    : rangeStart + chunkSize + 1;
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart)));
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd)));
-                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            processService.createCommand(command);
-                        }
-
-                        return effectThreadsCount;
-                    } else {
-                        // loop by day
-                        int runCunt = 0;
-                        while (!start.after(end)) {
-                            runCunt += 1;
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
-                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            processService.createCommand(command);
-                            start = DateUtils.getSomeDay(start, 1);
-                        }
-                        return runCunt;
-                    }
-                }
-            } else {
-                logger.error("there is not valid schedule date for the process definition code:{}", processDefineCode);
+            if (start == null || end == null) {
+                return 0;
             }
+            return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber);
         } else {
             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
             return processService.createCommand(command);
         }
+    }
 
-        return 0;
+    /**
+     * create complement command
+     * close left open right
+     *
+     * @param start
+     * @param end
+     * @param runMode
+     * @return
+     */
+    private int createComplementCommandList(Date start, Date end, RunMode runMode, Command command, Integer expectedParallelismNumber) {
+        int createCount = 0;
+        runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
+        Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
+        switch (runMode) {
+            case RUN_MODE_SERIAL: {
+                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
+                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
+                command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                createCount = processService.createCommand(command);
+                break;
+            }
+            case RUN_MODE_PARALLEL: {
+                LinkedList<Date> listDate = new LinkedList<>();
+                List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+                listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
+                createCount = listDate.size();
+                if (!CollectionUtils.isEmpty(listDate)) {
+                    if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
+                        createCount = Math.min(listDate.size(), expectedParallelismNumber);
+                    }
+                    logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
+                    int chunkSize = listDate.size() / createCount;
+
+                    for (int i = 0; i < createCount; i++) {
+                        int rangeStart = i == 0 ? i : (i * chunkSize);
+                        int rangeEnd = i == createCount - 1 ? listDate.size() - 1
+                                : rangeStart + chunkSize;
+                        if (rangeEnd == listDate.size()) {
+                            rangeEnd = listDate.size() - 1;
+                        }
+                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart)));
+                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd)));
+                        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                        processService.createCommand(command);
+                    }
+                }
+                break;
+            }
+            default:
+                break;
+        }
+        logger.info("create complement command count: {}", createCount);
+        return createCount;
     }
 }

+ 10 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@@ -449,6 +449,11 @@ public final class Constants {
      */
     public static final String CMDPARAM_COMPLEMENT_DATA_END_DATE = "complementEndDate";
 
+    /**
+     * complement date default cron string
+     */
+    public static final String DEFAULT_CRON_STRING = "0 0 0 * * ? *";
+
 
     /**
      * data source config
@@ -503,6 +508,11 @@ public final class Constants {
      */
     public static final int SLEEP_TIME_MILLIS = 1000;
 
+    /**
+     * one second mils
+     */
+    public static final int SECOND_TIME_MILLIS = 1000;
+
     /**
      * master task instance cache-database refresh interval
      */

+ 17 - 8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@@ -436,11 +436,15 @@ public class WorkflowExecuteThread implements Runnable {
             scheduleDate = complementListDate.get(0);
         } else if (processInstance.getState().typeIsFinished()) {
             endProcess();
+            if (complementListDate.size() <= 0) {
+                logger.info("process complement end. process id:{}", processInstance.getId());
+                return true;
+            }
             int index = complementListDate.indexOf(scheduleDate);
             if (index >= complementListDate.size() - 1 || !processInstance.getState().typeIsSuccess()) {
                 logger.info("process complement end. process id:{}", processInstance.getId());
                 // complement data ends || no success
-                return false;
+                return true;
             }
             logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
                     processInstance.getId(),
@@ -559,14 +563,19 @@ public class WorkflowExecuteThread implements Runnable {
             }
         }
 
-        if (complementListDate.size() == 0 && needComplementProcess()) {
-            complementListDate = processService.getComplementDateList(
-                    JSONUtils.toMap(processInstance.getCommandParam()),
-                    processInstance.getProcessDefinitionCode());
-            logger.info(" process definition code:{} complement data: {}",
-                processInstance.getProcessDefinitionCode(), complementListDate.toString());
+        if (processInstance.isComplementData() && complementListDate.size() == 0) {
+            Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
+            if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+                Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+                Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+                List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+                if (complementListDate.size() == 0 && needComplementProcess()) {
+                    complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
+                    logger.info(" process definition code:{} complement data: {}",
+                            processInstance.getProcessDefinitionCode(), complementListDate.toString());
+                }
+            }
         }
-
     }
 
     /**

+ 10 - 24
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -587,7 +587,12 @@ public class ProcessService {
         if (scheduleTime == null
                 && cmdParam != null
                 && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
-            List<Date> complementDateList = getComplementDateList(cmdParam, command.getProcessDefinitionCode());
+
+            Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+            Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+            List<Schedule> schedules = queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+            List<Date> complementDateList = CronUtils.getSelfFireDateList(start, end, schedules);
+
             if (complementDateList.size() > 0) {
                 scheduleTime = complementDateList.get(0);
             } else {
@@ -972,7 +977,10 @@ public class ProcessService {
             return;
         }
 
-        List<Date> complementDate = getComplementDateList(cmdParam, processInstance.getProcessDefinitionCode());
+        Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+        Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+        List<Schedule> listSchedules = queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+        List<Date> complementDate = CronUtils.getSelfFireDateList(start, end, listSchedules);
 
         if (complementDate.size() > 0
                 && Flag.NO == processInstance.getIsSubProcess()) {
@@ -982,28 +990,6 @@ public class ProcessService {
             processDefinition.getGlobalParamMap(),
             processDefinition.getGlobalParamList(),
             CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
-
-    }
-
-    /**
-     *  return complement date list
-     *
-     * @param cmdParam
-     * @param processDefinitionCode
-     * @return
-     */
-    public List<Date> getComplementDateList(Map<String, String> cmdParam, Long processDefinitionCode) {
-        List<Date> result = new ArrayList<>();
-        Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
-        Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
-        if (startDate.after(endDate)) {
-            Date tmp = startDate;
-            startDate = endDate;
-            endDate = tmp;
-        }
-        List<Schedule> schedules = queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
-        result.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules));
-        return result;
     }
 
     /**

+ 14 - 11
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java

@@ -25,6 +25,7 @@ import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.week;
 
 import static com.cronutils.model.CronType.QUARTZ;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CycleEnum;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -182,18 +183,20 @@ public class CronUtils {
      * gets all scheduled times for a period of time based on self dependency
      * if schedulers is empty then default scheduler = 1 day
      */
-    public static List<Date> getSelfFireDateList(Date startTime, Date endTime, List<Schedule> schedules) {
+    public static List<Date> getSelfFireDateList(final Date startTime, final Date endTime, final List<Schedule> schedules) {
         List<Date> result = new ArrayList<>();
-        if (!CollectionUtils.isEmpty(schedules)) {
-            for (Schedule schedule : schedules) {
-                result.addAll(CronUtils.getSelfFireDateList(startTime, endTime, schedule.getCrontab()));
-            }
-        } else {
-            Date start = startTime;
-            for (int i = 0; start.before(endTime); i++) {
-                start = DateUtils.getSomeDay(startTime, i);
-                result.add(start);
-            }
+        Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS);
+        Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS);
+
+        List<Schedule> listSchedule = new ArrayList<>();
+        listSchedule.addAll(schedules);
+        if (CollectionUtils.isEmpty(listSchedule)) {
+            Schedule schedule = new Schedule();
+            schedule.setCrontab(Constants.DEFAULT_CRON_STRING);
+            listSchedule.add(schedule);
+        }
+        for (Schedule schedule : listSchedule) {
+            result.addAll(CronUtils.getSelfFireDateList(from, to, schedule.getCrontab()));
         }
         return result;
     }

+ 10 - 1
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java

@@ -167,7 +167,7 @@ public class CronUtilsTest {
     }
 
     @Test
-    public void getSelfFireDateList() throws ParseException{
+    public void getSelfFireDateList() throws ParseException {
         Date from = DateUtils.stringToDate("2020-01-01 00:00:00");
         Date to = DateUtils.stringToDate("2020-01-31 00:00:00");
         // test date
@@ -179,6 +179,15 @@ public class CronUtilsTest {
         // test other
         Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? ")).size());
         Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size());
+        from = DateUtils.stringToDate("2020-01-01 00:02:00");
+        to = DateUtils.stringToDate("2020-01-01 00:02:00");
+        Assert.assertEquals(1, CronUtils.getFireDateList(new Date(from.getTime() - 1000), to, CronUtils.parse2CronExpression("0 * * * * ? ")).size());
+
+        from = DateUtils.stringToDate("2020-01-01 00:02:00");
+        to = DateUtils.stringToDate("2020-01-01 00:04:00");
+        Assert.assertEquals(2, CronUtils.getFireDateList(new Date(from.getTime() - 1000),
+                new Date(to.getTime() - 1000),
+                CronUtils.parse2CronExpression("0 * * * * ? ")).size());
     }
 
     @Test