Prechádzať zdrojové kódy

feat: Python add online_schedule for workflow control schedule state (#13551)

related: apache/dolphinscheduler-sdk-python#73
Jay Chung 2 rokov pred
rodič
commit
30fd43e32b

+ 11 - 4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@@ -220,6 +220,7 @@ public class PythonGateway {
      * @param globalParams global params
      * @param schedule schedule for workflow, will not set schedule if null,
      * and if would always fresh exists schedule if not null
+     * @param onlineSchedule Whether set the workflow's schedule to online state
      * @param warningType warning type
      * @param warningGroupId warning group id
      * @param timeout timeout for workflow working, if running time longer than timeout,
@@ -236,6 +237,7 @@ public class PythonGateway {
                                        String description,
                                        String globalParams,
                                        String schedule,
+                                       boolean onlineSchedule,
                                        String warningType,
                                        int warningGroupId,
                                        int timeout,
@@ -277,7 +279,8 @@ public class PythonGateway {
 
         // Fresh workflow schedule
         if (schedule != null) {
-            createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType,
+            createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, onlineSchedule, workerGroup,
+                    warningType,
                     warningGroupId);
         }
         processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
@@ -319,6 +322,7 @@ public class PythonGateway {
      * @param projectCode project which workflow belongs to
      * @param workflowCode workflow code
      * @param schedule schedule expression
+     * @param onlineSchedule Whether set the workflow's schedule to online state
      * @param workerGroup work group
      * @param warningType warning type
      * @param warningGroupId warning group id
@@ -327,6 +331,7 @@ public class PythonGateway {
                                         long projectCode,
                                         long workflowCode,
                                         String schedule,
+                                        boolean onlineSchedule,
                                         String workerGroup,
                                         String warningType,
                                         int warningGroupId) {
@@ -347,9 +352,11 @@ public class PythonGateway {
             schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
                     warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
         }
-        // Always set workflow online to set schedule online
-        processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
-        schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
+        if (onlineSchedule) {
+            // set workflow online to make sure we can set schedule online
+            processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
+            schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
+        }
     }
 
     public void execWorkflowInstance(String userName,