Forráskód Böngészése

[Feature-#3805][server-master] Task parameter transfer (#5077)


* fix out param format bug

Co-authored-by: wangxj <wangxj31>
wangxj3 4 éve
szülő
commit
29d42fd92d

+ 2 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@@ -469,6 +469,8 @@ public final class Constants {
 
     public static final String CMD_PARAM_START_PARAMS = "StartParams";
 
+    public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
+
     /**
      * complement data start date
      */

+ 1 - 12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@@ -557,7 +557,7 @@ public class MasterExecThread implements Runnable {
     private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) {
         String globalParams = this.processInstance.getGlobalParams();
         if (StringUtils.isNotEmpty(globalParams)) {
-            Map<String, String> globalMap = getGlobalParamMap(globalParams);
+            Map<String, String> globalMap = processService.getGlobalParamMap(globalParams);
             if (globalMap != null && globalMap.size() != 0) {
                 setGlobalMapToTask(taskNode, taskInstance, globalMap);
             }
@@ -586,17 +586,6 @@ public class MasterExecThread implements Runnable {
         }
     }
 
-    public Map<String, String> getGlobalParamMap(String globalParams) {
-        List<Property> propList;
-        Map<String,String> globalParamMap = new HashMap<>();
-        if (StringUtils.isNotEmpty(globalParams)) {
-            propList = JSONUtils.toList(globalParams, Property.class);
-            globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
-        }
-
-        return globalParamMap;
-    }
-
     private void submitPostNode(String parentNodeName) {
         Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
         List<TaskInstance> taskInstances = new ArrayList<>();

+ 38 - 8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
@@ -586,14 +587,19 @@ public class ProcessService {
 
     private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
         // get start params from command param
-        Map<String, String> startParamMap = null;
+        Map<String, String> startParamMap = new HashMap<>();
         if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
             String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
             startParamMap = JSONUtils.toMap(startParamJson);
         }
-
+        Map<String, String> fatherParamMap = new HashMap<>();
+        if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
+            String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
+            fatherParamMap = JSONUtils.toMap(fatherParamJson);
+        }
+        startParamMap.putAll(fatherParamMap);
         // set start param into global params
-        if (startParamMap != null && startParamMap.size() > 0
+        if (startParamMap.size() > 0
                 && processDefinition.getGlobalParamMap() != null) {
             for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
                 String val = startParamMap.get(param.getKey());
@@ -1065,7 +1071,7 @@ public class ProcessService {
     /**
      * complement data needs transform parent parameter to child.
      */
-    private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) {
+    private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,Map<String,String> fatherParams) {
         // set sub work process command
         String processMapStr = JSONUtils.toJsonString(instanceMap);
         Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
@@ -1077,9 +1083,24 @@ public class ProcessService {
             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
             processMapStr = JSONUtils.toJsonString(cmdParam);
         }
+        if (fatherParams.size() != 0) {
+            cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
+            processMapStr = JSONUtils.toJsonString(cmdParam);
+        }
         return processMapStr;
     }
 
+    public Map<String, String> getGlobalParamMap(String globalParams) {
+        List<Property> propList;
+        Map<String, String> globalParamMap = new HashMap<>();
+        if (StringUtils.isNotEmpty(globalParams)) {
+            propList = JSONUtils.toList(globalParams, Property.class);
+            globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
+        }
+
+        return globalParamMap;
+    }
+
     /**
      * create sub work process command
      */
@@ -1089,9 +1110,18 @@ public class ProcessService {
                                            TaskInstance task) {
         CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
         TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
-        Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
-        Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
-        String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance);
+        Map<String, Object> subProcessParam = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
+        Integer childDefineId = Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)));
+        Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
+        List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+        Map<String, String> globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
+        Map<String,String> fatherParams = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(allParam)) {
+            for (Property info : allParam) {
+                fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
+            }
+        }
+        String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance,fatherParams);
 
         return new Command(
                 commandType,
@@ -1601,7 +1631,7 @@ public class ProcessService {
                 if (property == null) {
                     continue;
                 }
-                String value = row.get(paramName);
+                String value = String.valueOf(row.get(paramName));
                 if (StringUtils.isNotEmpty(value)) {
                     property.setValue(value);
                     info.setValue(value);