Sfoglia il codice sorgente

[Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)

* FIX: dependent

* FIX: version

* MOD: for review
Stalary 2 anni fa
parent
commit
37325b4c34

+ 5 - 3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -907,6 +907,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
         for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
             dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+            dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
             dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
             Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
             cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@@ -927,7 +928,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
 
         return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle,
-                workerGroup);
+                workerGroup, processDefinitionCode);
     }
 
     /**
@@ -938,7 +939,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
                                                                                   List<DependentProcessDefinition> dependentProcessDefinitionList,
                                                                                   CycleEnum processDefinitionCycle,
-                                                                                  String workerGroup) {
+                                                                                  String workerGroup,
+                                                                                  long upstreamProcessDefinitionCode) {
         List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
 
         List<Long> processDefinitionCodeList =
@@ -949,7 +951,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
 
         for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
-            if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
+            if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
                 if (processDefinitionWorkerGroupMap
                         .get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
                     dependentProcessDefinition.setWorkerGroup(workerGroup);

+ 15 - 2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java

@@ -41,6 +41,11 @@ public class DependentProcessDefinition {
      */
     private String processDefinitionName;
 
+    /**
+     * process definition version
+     **/
+    private int processDefinitionVersion;
+
     /**
      * task definition name
      */
@@ -60,14 +65,14 @@ public class DependentProcessDefinition {
      * get dependent cycle
      * @return CycleEnum
      */
-    public CycleEnum getDependentCycle() {
+    public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
         DependentParameters dependentParameters = this.getDependentParameters();
         List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
 
         for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
             List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
             for (DependentItem dependentItem : dependentItemList) {
-                if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) {
+                if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) {
                     return cycle2CycleEnum(dependentItem.getCycle());
                 }
             }
@@ -122,6 +127,14 @@ public class DependentProcessDefinition {
         this.processDefinitionCode = code;
     }
 
+    public int getProcessDefinitionVersion() {
+        return processDefinitionVersion;
+    }
+
+    public void setProcessDefinitionVersion(int processDefinitionVersion) {
+        this.processDefinitionVersion = processDefinitionVersion;
+    }
+
     public long getTaskDefinitionCode() {
         return this.taskDefinitionCode;
     }

+ 1 - 0
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@@ -149,6 +149,7 @@
         SELECT
         c.code AS process_definition_code
         ,c.name AS process_definition_name
+        ,c.version as process_definition_version
         ,a.code AS task_definition_code
         ,a.task_params
         FROM