Sfoglia il codice sorgente

[Improvement-9227][master]implement use the slot to scan the database (#9228)

when the master assigns tasks by slot,implement use the slot to scan the database.

This closes #9227
worry 3 anni fa
parent
commit
d3251c9bcc

+ 6 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java

@@ -52,4 +52,10 @@ public interface CommandMapper extends BaseMapper<Command> {
      */
     List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") int offset);
 
+
+    /**
+     * query command page by slot
+     * @return command list
+     */
+    List<Command> queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset, @Param("masterCount") int masterCount, @Param("thisMasterSlot") int thisMasterSlot);
 }

+ 8 - 0
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml

@@ -39,4 +39,12 @@
         order by process_instance_priority, id asc
         limit #{limit} offset #{offset}
     </select>
+
+    <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
+        select *
+        from t_ds_command
+        where id % #{masterCount} = #{thisMasterSlot}
+        order by process_instance_priority, id asc
+            limit #{limit} offset #{offset}
+    </select>
 </mapper>

+ 33 - 0
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -166,6 +167,38 @@ public class CommandMapperTest extends BaseDaoTest {
         assertThat(actualCommandCounts.size(),greaterThanOrEqualTo(1));
     }
 
+    /**
+     * test query command page by slot
+     */
+    @Test
+    public void testQueryCommandPageBySlot() {
+        int masterCount = 4;
+        int thisMasterSlot = 2;
+        // for hit or miss
+        toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+        toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+        toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+        toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+    }
+
+    private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot) {
+        Command command = createCommand();
+        int id = command.getId();
+        boolean hit = id % masterCount == thisMasterSlot;
+        List<Command> commandList = commandMapper.queryCommandPageBySlot(1, 0, masterCount, thisMasterSlot);
+        if (hit) {
+            assertEquals(id,commandList.get(0).getId());
+        } else {
+            commandList.forEach(o -> {
+                assertNotEquals(id, o.getId());
+                assertEquals(thisMasterSlot, o.getId() % masterCount);
+            });
+        }
+        return hit;
+    }
+
+
+
     /**
      * create command map
      * @param count map count

+ 5 - 20
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@@ -226,27 +226,12 @@ public class MasterSchedulerService extends Thread {
         int pageNumber = 0;
         int pageSize = masterConfig.getFetchCommandNum();
         List<Command> result = new ArrayList<>();
-        while (Stopper.isRunning()) {
-            // todo: Can we use the slot to scan database?
-            List<Command> commandList = processService.findCommandPage(pageSize, pageNumber);
-            if (commandList.size() == 0) {
-                return result;
-            }
-            for (Command command : commandList) {
-                SlotCheckState slotCheckState = slotCheck(command);
-                if (slotCheckState.equals(SlotCheckState.CHANGE)) {
-                    // return and wait next scan, don't reset param, waste resources of cpu
-                    return new ArrayList<>();
-                }
-                if (slotCheckState.equals(SlotCheckState.PASS)) {
-                    result.add(command);
-                }
-            }
-            if (CollectionUtils.isNotEmpty(result)) {
-                logger.info("find {} commands, slot:{}", result.size(), ServerNodeManager.getSlot());
-                break;
+        if (Stopper.isRunning()) {
+            int thisMasterSlot = ServerNodeManager.getSlot();
+            int masterCount = ServerNodeManager.getMasterSize();
+            if (masterCount > 0) {
+                result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
             }
-            pageNumber += 1;
         }
         return result;
     }

+ 10 - 0
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -407,6 +407,16 @@ public class ProcessService {
         return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
     }
 
+    /**
+     * get command page
+     */
+    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
+        if (masterCount <= 0) {
+            return Lists.newArrayList();
+        }
+        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
+    }
+
     /**
      * check the input command exists in queue list
      *

+ 10 - 0
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@@ -844,6 +844,16 @@ public class ProcessServiceTest {
         Assert.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
     }
 
+    @Test
+    public void testFindCommandPageBySlot() {
+        int pageSize = 1;
+        int pageNumber = 0;
+        int masterCount = 0;
+        int thisMasterSlot = 2;
+        List<Command> commandList = processService.findCommandPageBySlot(pageSize,pageNumber,masterCount,thisMasterSlot);
+        Assert.assertEquals(0,commandList.size());
+    }
+
     private TaskGroupQueue getTaskGroupQueue() {
         TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
         taskGroupQueue.setTaskName("task name");