Bläddra i källkod

Merge pull request #537 from lenboo/dev-1.1.0

 update worker task queue
bao liang 5 år sedan
förälder
incheckning
a0824991f4

+ 23 - 3
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@@ -151,7 +151,27 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
                 int size = list.size();
 
 
-                Set<String> taskTreeSet = new TreeSet<>();
+                Set<String> taskTreeSet = new TreeSet<>(new Comparator<String>() {
+                    @Override
+                    public int compare(String o1, String o2) {
+
+                        String s1 = o1;
+                        String s2 = o2;
+                        String[] s1Array = s1.split(Constants.UNDERLINE);
+                        if(s1Array.length>4){
+                            // warning: if this length > 5, need to be changed
+                            s1 = s1.substring(0, s1.lastIndexOf(Constants.UNDERLINE) );
+                        }
+
+                        String[] s2Array = s2.split(Constants.UNDERLINE);
+                        if(s2Array.length>4){
+                            // warning: if this length > 5, need to be changed
+                            s2 = s2.substring(0, s2.lastIndexOf(Constants.UNDERLINE) );
+                        }
+
+                        return s1.compareTo(s2);
+                    }
+                });
 
                 for (int i = 0; i < size; i++) {
 
@@ -173,8 +193,8 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
                                     continue;
                                 }
                             }
+                            formatTask += Constants.UNDERLINE + taskDetailArrs[4];
                         }
-
                         taskTreeSet.add(formatTask);
 
                     }
@@ -229,7 +249,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
         int taskId = Integer.parseInt(taskArray[3]);
 
         StringBuilder sb = new StringBuilder(50);
-        String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId);
+        String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[2], taskId);
 
         sb.append(destTask);
 

+ 8 - 6
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@@ -17,6 +17,8 @@
 package cn.escheduler.common.queue;
 
 import cn.escheduler.common.Constants;
+import cn.escheduler.common.utils.IpUtils;
+import cn.escheduler.common.utils.OSUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -58,31 +60,31 @@ public class TaskQueueImplTest {
     @Test
     public void testAdd(){
 
+
         //add
-        tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775");
+        tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
         tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775");
-        tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775");
+        tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775,"+IpUtils.ipToLong(OSUtils.getHost()));
         tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775");
 
         List<String> tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
 
-        if(tasks.size() < 0){
+        if(tasks.size() <= 0){
             return;
         }
 
         //pop
         String node1 = tasks.get(0);
 
-        assertEquals(node1,"0_0000000001_1_0000000001");
+        assertEquals(node1,"1_0_1_1_-1");
 
         tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
 
-        if(tasks.size() < 0){
+        if(tasks.size() <= 0){
             return;
         }
 
         String node2 = tasks.get(0);
-        assertEquals(node2,"0_0000000001_1_0000000001");
 
     }