Selaa lähdekoodia

process definition json worker group convert #2794 (#2809)

* add LoggerServerTest UT

* add LoggerServerTest UT

* add LoggerServerTest UT
add RemoveTaskLogRequestCommandTest UT
add RemoveTaskLogResponseCommandTest

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* add not worker log and remove worker invalid property

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
qiaozhanwei 4 vuotta sitten
vanhempi
commit
d0fbfeaf17

+ 17 - 9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@@ -16,6 +16,8 @@
  */
 package org.apache.dolphinscheduler.dao.upgrade;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.utils.*;
@@ -274,18 +276,24 @@ public abstract class UpgradeDao extends AbstractBaseDao {
             Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
 
             for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
-                ProcessData processData = JSONUtils.parseObject(entry.getValue(), ProcessData.class);
-
-                List<TaskNode> tasks = processData.getTasks();
-                for (TaskNode taskNode : tasks){
-                    Integer workerGroupId = taskNode.getWorkerGroupId();
-                    if (workerGroupId == -1){
-                        taskNode.setWorkerGroup("default");
+                JSONObject jsonObject = JSONObject.parseObject(entry.getValue());
+                JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks"));
+
+                for (int i = 0 ;i < tasks.size() ; i++){
+                    JSONObject task = tasks.getJSONObject(i);
+                    Integer workerGroupId = task.getInteger("workerGroupId");
+                    if (workerGroupId == -1) {
+                        task.put("workerGroup", "default");
                     }else {
-                        taskNode.setWorkerGroup(oldWorkerGroupMap.get(workerGroupId));
+                        task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
                     }
                 }
-                replaceProcessDefinitionMap.put(entry.getKey(),JSONUtils.toJson(processData));
+
+                jsonObject.remove(jsonObject.getString("tasks"));
+
+                jsonObject.put("tasks",tasks);
+
+                replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toJSONString());
             }
             if (replaceProcessDefinitionMap.size() > 0){
                 processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);