Ver código fonte

Fix master cluster may loop command unbalanced (#12891)

(cherry picked from commit 3b2b86661be76b7c1404a910c865d78b7936313d)
Wenjun Ruan 2 anos atrás
pai
commit
d99ba29b66

+ 1 - 1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java

@@ -54,7 +54,7 @@ public interface CommandMapper extends BaseMapper<Command> {
      * query command page by slot
      * @return command list
      */
-    List<Command> queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset,
+    List<Command> queryCommandPageBySlot(@Param("limit") int limit,
                                          @Param("masterCount") int masterCount,
                                          @Param("thisMasterSlot") int thisMasterSlot);
 }

+ 1 - 1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml

@@ -45,6 +45,6 @@
         from t_ds_command
         where id % #{masterCount} = #{thisMasterSlot}
         order by process_instance_priority, id asc
-            limit #{limit} offset #{offset}
+            limit #{limit}
     </select>
 </mapper>

+ 1 - 1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@@ -176,7 +176,7 @@ public class CommandMapperTest extends BaseDaoTest {
         Command command = createCommand();
         Integer id = command.getId();
         boolean hit = id % masterCount == thisMasterSlot;
-        List<Command> commandList = commandMapper.queryCommandPageBySlot(1, 0, masterCount, thisMasterSlot);
+        List<Command> commandList = commandMapper.queryCommandPageBySlot(1, masterCount, thisMasterSlot);
         if (hit) {
             Assertions.assertEquals(id, commandList.get(0).getId());
         } else {

+ 5 - 5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@@ -269,16 +269,16 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                 logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
                 return Collections.emptyList();
             }
-            int pageNumber = 0;
             int pageSize = masterConfig.getFetchCommandNum();
             final List<Command> result =
-                    commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+                    commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
             if (CollectionUtils.isNotEmpty(result)) {
+                long cost = System.currentTimeMillis() - scheduleStartTime;
                 logger.info(
-                        "Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
-                        result.size(), thisMasterSlot, masterCount);
+                        "Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}",
+                        result.size(), cost, thisMasterSlot, masterCount);
+                ProcessInstanceMetrics.recordCommandQueryTime(cost);
             }
-            ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
             return result;
         } catch (Exception ex) {
             throw new MasterException("Master loop command from database error", ex);

+ 1 - 2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java

@@ -47,12 +47,11 @@ public interface CommandService {
     /**
      * Get command page
      * @param pageSize page size
-     * @param pageNumber page number
      * @param masterCount master count
      * @param thisMasterSlot master slot
      * @return command page
      */
-    List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
+    List<Command> findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot);
 
     /**
      * check the input command exists in queue list

+ 2 - 2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java

@@ -109,11 +109,11 @@ public class CommandServiceImpl implements CommandService {
     }
 
     @Override
-    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
+    public List<Command> findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot) {
         if (masterCount <= 0) {
             return Lists.newArrayList();
         }
-        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
+        return commandMapper.queryCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
     }
 
     @Override

+ 1 - 2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java

@@ -217,11 +217,10 @@ class CommandServiceImplTest {
     @Test
     public void testFindCommandPageBySlot() {
         int pageSize = 1;
-        int pageNumber = 0;
         int masterCount = 0;
         int thisMasterSlot = 2;
         List<Command> commandList =
-                commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+                commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
         Assertions.assertEquals(0, commandList.size());
     }