Browse Source

Merge pull request #528 from lenboo/dev-1.1.0

 update worker get task from queue
bao liang 5 years ago
parent
commit
bbf8f20657

+ 8 - 2
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java

@@ -115,12 +115,18 @@ public class ProcessInstanceService extends BaseDAGService {
             return checkResult;
         }
         ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId);
+        String workerGroupName = "";
         if(processInstance.getWorkerGroupId() == -1){
-            processInstance.setWorkerGroupName(DEFAULT);
+            workerGroupName = DEFAULT;
         }else{
             WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId());
-            processInstance.setWorkerGroupName(workerGroup.getName());
+            if(workerGroup != null){
+                workerGroupName = DEFAULT;
+            }else{
+                workerGroupName = workerGroup.getName();
+            }
         }
+        processInstance.setWorkerGroupName(workerGroupName);
         ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
         processInstance.setReceivers(processDefinition.getReceivers());
         processInstance.setReceiversCc(processDefinition.getReceiversCc());

+ 24 - 7
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@@ -22,6 +22,7 @@ import cn.escheduler.common.utils.Bytes;
 import cn.escheduler.common.utils.IpUtils;
 import cn.escheduler.common.utils.OSUtils;
 import cn.escheduler.common.zk.AbstractZKClient;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
@@ -157,7 +158,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
                     String taskDetail = list.get(i);
                     String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
 
-                    //向前版本兼容
+                    //向前版本兼ProcessInstanceService
                     if(taskDetailArrs.length >= 4){
 
                         //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
@@ -209,17 +210,33 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
         while(iterator.hasNext()){
             if(j++ < tasksNum){
                 String task = iterator.next();
-                String[] taskArray = task.split(Constants.UNDERLINE);
-                int processInstanceId = Integer.parseInt(taskArray[1]);
-                int taskId = Integer.parseInt(taskArray[3]);
-                String destTask = taskArray[0]+Constants.UNDERLINE + processInstanceId + Constants.UNDERLINE
-                        + taskArray[2] + Constants.UNDERLINE + taskId;
-                taskslist.add(destTask);
+
+                taskslist.add(getOriginTaskFormat(task));
             }
         }
         return taskslist;
     }
 
+    /**
+     * format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
+     * processInstanceId and task id need to be convert to int.
+     * @param formatTask
+     * @return
+     */
+    private String getOriginTaskFormat(String formatTask){
+        String[] taskArray = formatTask.split(Constants.UNDERLINE);
+        int processInstanceId = Integer.parseInt(taskArray[1]);
+        int taskId = Integer.parseInt(taskArray[3]);
+        String suffix = "";
+        for(int index =4; index < taskArray.length; index++){
+            suffix += taskArray[index] + Constants.UNDERLINE;
+        }
+        String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId);
+        if(StringUtils.isNotEmpty(suffix)){
+            destTask += Constants.UNDERLINE + suffix;
+        }
+        return destTask;
+    }
 
     @Override
     public void removeNode(String key, String nodeValue){

+ 2 - 1
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@@ -493,7 +493,8 @@ public class ProcessDao extends AbstractBaseDao {
         processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
         // set process instance priority
         processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
-        processInstance.setWorkerGroupId(command.getWorkerGroupId());
+        int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId();
+        processInstance.setWorkerGroupId(workerGroupId);
         processInstance.setTimeout(processDefinition.getTimeout());
         processInstance.setTenantId(processDefinition.getTenantId());
         return processInstance;

+ 1 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@@ -153,7 +153,7 @@ public class FetchTaskThread implements Runnable{
                                 }
 
                                 String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
-                                String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
+                                String taskInstIdStr = taskStringArray[3];
                                 Date now = new Date();
                                 Integer taskId = Integer.parseInt(taskInstIdStr);