Переглянути джерело

fix: start param for wf not work (#15544)

* fix: start param for wf not work

fix: #15280

* fix test

(cherry picked from commit 01eb8f834ff4abf662b704379f1bcd019f4d2d74)
Jay Chung 1 рік тому
батько
коміт
fa7b4c1c8f

+ 1 - 38
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.runner;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_FATHER_PARAMS;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
@@ -29,8 +28,6 @@ import static org.apache.dolphinscheduler.common.constants.Constants.COMMA;
 import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
 import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -860,7 +857,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
             Map<String, String> cmdParam = JSONUtils.toMap(workflowInstance.getCommandParam());
             if (cmdParam != null) {
                 // reset global params while there are start parameters
-                setGlobalParamIfCommanded(workflowDefinition, cmdParam);
+                processService.setGlobalParamIfCommanded(workflowDefinition, cmdParam);
 
                 Date start = null;
                 Date end = null;
@@ -2057,40 +2054,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
         return waitToRetryTaskInstanceMap;
     }
 
-    private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
-        // get start params from command param
-        Map<String, String> startParamMap = new HashMap<>();
-        if (cmdParam.containsKey(CMD_PARAM_START_PARAMS)) {
-            String startParamJson = cmdParam.get(CMD_PARAM_START_PARAMS);
-            startParamMap = JSONUtils.toMap(startParamJson);
-        }
-        Map<String, String> fatherParamMap = new HashMap<>();
-        if (cmdParam.containsKey(CMD_PARAM_FATHER_PARAMS)) {
-            String fatherParamJson = cmdParam.get(CMD_PARAM_FATHER_PARAMS);
-            fatherParamMap = JSONUtils.toMap(fatherParamJson);
-        }
-        startParamMap.putAll(fatherParamMap);
-        // set start param into global params
-        Map<String, String> globalMap = processDefinition.getGlobalParamMap();
-        List<Property> globalParamList = processDefinition.getGlobalParamList();
-        if (startParamMap.size() > 0 && globalMap != null) {
-            // start param to overwrite global param
-            for (Map.Entry<String, String> param : globalMap.entrySet()) {
-                String val = startParamMap.get(param.getKey());
-                if (val != null) {
-                    param.setValue(val);
-                }
-            }
-            // start param to create new global param if global not exist
-            for (Map.Entry<String, String> startParam : startParamMap.entrySet()) {
-                if (!globalMap.containsKey(startParam.getKey())) {
-                    globalMap.put(startParam.getKey(), startParam.getValue());
-                    globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
-                }
-            }
-        }
-    }
-
     /**
      * clear related data if command of process instance is EXECUTE_TASK
      * 1. find all task code from sub dag (only contains related task)

+ 12 - 0
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java

@@ -27,6 +27,8 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 import lombok.NonNull;
 
 public interface CuringParamsService {
@@ -80,6 +82,16 @@ public interface CuringParamsService {
                                                   @NonNull AbstractParameters parameters,
                                                   @NonNull ProcessInstance processInstance);
 
+    /**
+     * Parse workflow star parameter
+     */
+    Map<String, Property> parseWorkflowStartParam(@Nullable Map<String, String> cmdParam);
+
+    /**
+     * Parse workflow father parameter
+     */
+    Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, String> cmdParam);
+
     /**
      * preBuildBusinessParams
      * @param processInstance

+ 26 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java

@@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETE
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_DEFINITION_NAME;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID;
 
+import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.constants.DateConstants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -55,6 +56,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
+
 import lombok.NonNull;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -141,6 +144,28 @@ public class CuringParamsServiceImpl implements CuringParamsService {
         return JSONUtils.toJsonString(globalParamList);
     }
 
+    @Override
+    public Map<String, Property> parseWorkflowStartParam(@Nullable Map<String, String> cmdParam) {
+        if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) {
+            return new HashMap<>();
+        }
+        String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS);
+        Map<String, String> startParamMap = JSONUtils.toMap(startParamJson);
+        return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue())));
+    }
+
+    @Override
+    public Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, String> cmdParam) {
+        if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) {
+            return new HashMap<>();
+        }
+        String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS);
+        Map<String, String> startParamMap = JSONUtils.toMap(startParamJson);
+        return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue())));
+    }
+
     /**
      * the global parameters and local parameters used in the worker will be prepared here, and built-in parameters.
      *
@@ -199,7 +224,7 @@ public class CuringParamsServiceImpl implements CuringParamsService {
         }
 
         if (MapUtils.isNotEmpty(cmdParam)) {
-            prepareParamsMap.putAll(ParameterUtils.getUserDefParamsMap(cmdParam));
+            prepareParamsMap.putAll(parseWorkflowStartParam(cmdParam));
         }
 
         Iterator<Map.Entry<String, Property>> iter = prepareParamsMap.entrySet().iterator();

+ 3 - 0
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -50,6 +50,7 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import javax.annotation.Nullable;
@@ -194,4 +195,6 @@ public interface ProcessService {
     void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
 
     void saveCommandTrigger(Integer commandId, Integer processInstanceId);
+
+    void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam);
 }

+ 13 - 18
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@@ -28,8 +28,6 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
 import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
 import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
 
 import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
@@ -587,35 +585,32 @@ public class ProcessServiceImpl implements ProcessService {
         return processInstance;
     }
 
-    private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
+    @Override
+    public void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
+
         // get start params from command param
-        Map<String, String> startParamMap = new HashMap<>();
-        if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) {
-            String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS);
-            startParamMap = JSONUtils.toMap(startParamJson);
-        }
-        Map<String, String> fatherParamMap = new HashMap<>();
-        if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) {
-            String fatherParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS);
-            fatherParamMap = JSONUtils.toMap(fatherParamJson);
-        }
-        startParamMap.putAll(fatherParamMap);
+        Map<String, Property> fatherParam = curingGlobalParamsService.parseWorkflowFatherParam(cmdParam);
+        Map<String, Property> startParamMap = new HashMap<>(fatherParam);
+
+        Map<String, Property> currentStartParamMap = curingGlobalParamsService.parseWorkflowStartParam(cmdParam);
+        startParamMap.putAll(currentStartParamMap);
+
         // set start param into global params
         Map<String, String> globalMap = processDefinition.getGlobalParamMap();
         List<Property> globalParamList = processDefinition.getGlobalParamList();
         if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
             // start param to overwrite global param
             for (Map.Entry<String, String> param : globalMap.entrySet()) {
-                String val = startParamMap.get(param.getKey());
+                String val = startParamMap.get(param.getKey()).getValue();
                 if (val != null) {
                     param.setValue(val);
                 }
             }
             // start param to create new global param if global not exist
-            for (Entry<String, String> startParam : startParamMap.entrySet()) {
+            for (Entry<String, Property> startParam : startParamMap.entrySet()) {
                 if (!globalMap.containsKey(startParam.getKey())) {
-                    globalMap.put(startParam.getKey(), startParam.getValue());
-                    globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
+                    globalMap.put(startParam.getKey(), startParam.getValue().getValue());
+                    globalParamList.add(startParam.getValue());
                 }
             }
         }

+ 44 - 0
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java

@@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
 
+import org.apache.commons.collections4.MapUtils;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -234,4 +236,46 @@ public class CuringParamsServiceTest {
         Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE).getValue(),
                 String.valueOf(processDefinition.getCode()));
     }
+
+    @Test
+    public void testParseWorkflowStartParam() {
+        Map<String, Property> result = new HashMap<>();
+        // empty cmd param
+        Map<String, String> startParamMap = new HashMap<>();
+        result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
+        Assertions.assertTrue(MapUtils.isEmpty(result));
+
+        // without key
+        startParamMap.put("testStartParam", "$[yyyyMMdd]");
+        result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
+        Assertions.assertTrue(MapUtils.isEmpty(result));
+
+        startParamMap.put("StartParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}");
+        result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
+        Assertions.assertTrue(MapUtils.isNotEmpty(result));
+        Assertions.assertEquals(2, result.keySet().size());
+        Assertions.assertEquals("11111", result.get("param1").getValue());
+        Assertions.assertEquals("22222", result.get("param2").getValue());
+    }
+
+    @Test
+    public void testParseWorkflowFatherParam() {
+        Map<String, Property> result = new HashMap<>();
+        // empty cmd param
+        Map<String, String> startParamMap = new HashMap<>();
+        result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
+        Assertions.assertTrue(MapUtils.isEmpty(result));
+
+        // without key
+        startParamMap.put("testfatherParams", "$[yyyyMMdd]");
+        result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
+        Assertions.assertTrue(MapUtils.isEmpty(result));
+
+        startParamMap.put("fatherParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}");
+        result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
+        Assertions.assertTrue(MapUtils.isNotEmpty(result));
+        Assertions.assertEquals(2, result.keySet().size());
+        Assertions.assertEquals("11111", result.get("param1").getValue());
+        Assertions.assertEquals("22222", result.get("param2").getValue());
+    }
 }