Просмотр исходного кода

[Improvement][api] Optimize batch query in process-list interface (#13222)

Co-authored-by: chenjiaming <chenjiaming@kezaihui.com>
陈家名 2 лет назад
Родитель
Сommit
8870464c02

+ 10 - 10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@@ -96,6 +96,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@@ -162,7 +163,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.multipart.MultipartFile;
 
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -593,16 +593,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         Map<Long, Schedule> scheduleMap = schedulerService.queryScheduleByProcessDefinitionCodes(processDefinitionCodes)
                 .stream()
                 .collect(Collectors.toMap(Schedule::getProcessDefinitionCode, Function.identity()));
-
-        Map<Integer, String> userMap = userMapper.selectList(new QueryWrapper<>()).stream()
-                .collect(Collectors.toMap(User::getId, User::getUserName));
-
+        List<UserWithProcessDefinitionCode> userWithCodes = userMapper.queryUserWithProcessDefinitionCode(
+                processDefinitionCodes);
         for (ProcessDefinition pd : processDefinitions) {
-            // todo: use batch query
-            ProcessDefinitionLog processDefinitionLog =
-                    processDefinitionLogMapper.queryByDefinitionCodeAndVersion(pd.getCode(), pd.getVersion());
-            pd.setModifyBy(userMap.get(processDefinitionLog.getOperator()));
-            pd.setUserName(userMap.get(pd.getUserId()));
+            userWithCodes.stream()
+                    .filter(userWithCode -> userWithCode.getProcessDefinitionCode() == pd.getCode()
+                            && userWithCode.getProcessDefinitionVersion() == pd.getVersion())
+                    .findAny().ifPresent(userWithCode -> {
+                        pd.setModifyBy(userWithCode.getModifierName());
+                        pd.setUserName(userWithCode.getCreatorName());
+                    });
             Schedule schedule = scheduleMap.get(pd.getCode());
             pd.setScheduleReleaseState(schedule == null ? null : schedule.getReleaseState());
         }

+ 33 - 4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@@ -228,7 +229,6 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
     @Test
     public void testQueryProcessDefinitionListPaging() {
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
-        Mockito.when(userMapper.selectList(Mockito.any())).thenReturn(Lists.newArrayList());
 
         Project project = getProject(projectCode);
 
@@ -248,8 +248,15 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
         Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
                 WORKFLOW_DEFINITION);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+        long processDefinitionCode1 = 1L;
+        long processDefinitionCode2 = 2L;
+        List<ProcessDefinition> processDefinitions = Arrays.asList(
+                ProcessDefinition.builder().version(1).code(processDefinitionCode1).build(),
+                ProcessDefinition.builder().version(1).code(processDefinitionCode2).build());
+        List<Long> processDefinitionCodes = processDefinitions.stream()
+                .map(ProcessDefinition::getCode).collect(Collectors.toList());
         PageListingResult<ProcessDefinition> pageListingResult = PageListingResult.<ProcessDefinition>builder()
-                .records(Collections.emptyList())
+                .records(processDefinitions)
                 .currentPage(1)
                 .pageSize(10)
                 .totalCount(30)
@@ -260,11 +267,33 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
                 Mockito.eq(""),
                 Mockito.eq(1),
                 Mockito.eq(project.getCode()))).thenReturn(pageListingResult);
-
+        String user1 = "user1";
+        String user2 = "user2";
+        Mockito.when(userMapper.queryUserWithProcessDefinitionCode(processDefinitionCodes))
+                .thenReturn(Arrays.asList(
+                        UserWithProcessDefinitionCode.builder()
+                                .processDefinitionCode(processDefinitionCode1)
+                                .processDefinitionVersion(1)
+                                .modifierName(user1).build(),
+                        UserWithProcessDefinitionCode.builder()
+                                .processDefinitionCode(processDefinitionCode2)
+                                .processDefinitionVersion(1)
+                                .modifierName(user2).build()));
+        Schedule schedule1 = new Schedule();
+        schedule1.setProcessDefinitionCode(processDefinitionCode1);
+        schedule1.setReleaseState(ReleaseState.ONLINE);
+        Schedule schedule2 = new Schedule();
+        schedule2.setProcessDefinitionCode(processDefinitionCode2);
+        schedule2.setReleaseState(ReleaseState.ONLINE);
+        Mockito.when(schedulerService.queryScheduleByProcessDefinitionCodes(processDefinitionCodes))
+                .thenReturn(Arrays.asList(schedule1, schedule2));
         PageInfo<ProcessDefinition> pageInfo = processDefinitionService.queryProcessDefinitionListPaging(
                 user, project.getCode(), "", "", 1, 0, 10);
-
         Assertions.assertNotNull(pageInfo);
+        ProcessDefinition pd1 = pageInfo.getTotalList().stream()
+                .filter(pd -> pd.getCode() == processDefinitionCode1).findFirst().orElse(null);
+        assert pd1 != null;
+        Assertions.assertEquals(pd1.getModifyBy(), user1);
     }
 
     @Test

+ 42 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UserWithProcessDefinitionCode.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * User and task flow binding relationship
+ */
+@Data
+@Builder
+public class UserWithProcessDefinitionCode {
+
+    private long processDefinitionCode;
+
+    private int processDefinitionVersion;
+
+    private Integer modifierId;
+
+    private String modifierName;
+
+    private Integer creatorId;
+
+    private String creatorName;
+
+}

+ 10 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode;
 
 import org.apache.ibatis.annotations.Param;
 
@@ -177,4 +178,13 @@ public interface UserMapper extends BaseMapper<User> {
      * @return
      */
     List<User> queryEnabledUsers();
+
+    /**
+     * query User and task flow binding relationship
+     *
+     * @param processDefinitionCodes processDefinitionCodes
+     * @return user with process definition code
+     */
+    List<UserWithProcessDefinitionCode> queryUserWithProcessDefinitionCode(@Param("processDefinitionCodes") List<Long> processDefinitionCodes);
+
 }

+ 23 - 0
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml

@@ -143,4 +143,27 @@
         from t_ds_user
         where state = 1
     </select>
+    <select id="queryUserWithProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode">
+        select
+            dl.code as process_definition_code,
+            pd.version as process_definition_version,
+            u.id as modifier_id,
+            u.user_name as modifier_name,
+            u2.id as creator_id,
+            u2.user_name as creator_name
+        from t_ds_process_definition_log dl
+        inner join t_ds_process_definition pd
+        on pd.code = dl.code
+        and pd.version = dl.version
+        inner join t_ds_user u
+        on dl.operator = u.id
+        inner join t_ds_user u2
+        on pd.user_id = u2.id
+        <if test="processDefinitionCodes != null and processDefinitionCodes.size() != 0">
+            where dl.code in
+            <foreach item="code" collection="processDefinitionCodes" open="(" separator="," close=")">
+                #{code}
+            </foreach>
+        </if>
+    </select>
 </mapper>

+ 54 - 0
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java

@@ -22,9 +22,12 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.BaseDaoTest;
 import org.apache.dolphinscheduler.dao.entity.AccessToken;
 import org.apache.dolphinscheduler.dao.entity.AlertGroup;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.Queue;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -54,6 +57,12 @@ public class UserMapperTest extends BaseDaoTest {
     @Autowired
     private QueueMapper queueMapper;
 
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
     /**
      * insert one user
      *
@@ -320,4 +329,49 @@ public class UserMapperTest extends BaseDaoTest {
         insertOne();
         Assertions.assertTrue(userMapper.existUser(queueName));
     }
+
+    @Test
+    public void testQueryUserWithProcessDefinitionCode() {
+        User user = insertOne();
+        insertProcessDefinition(user.getId());
+        ProcessDefinitionLog log = insertProcessDefinitionLog(user.getId());
+        long processDefinitionCode = log.getCode();
+        List<UserWithProcessDefinitionCode> userWithCodes = userMapper.queryUserWithProcessDefinitionCode(
+                null);
+        UserWithProcessDefinitionCode userWithCode = userWithCodes.stream()
+                .filter(code -> code.getProcessDefinitionCode() == processDefinitionCode)
+                .findAny().orElse(null);
+        assert userWithCode != null;
+        Assertions.assertEquals(userWithCode.getCreatorId(), user.getId());
+    }
+
+    private ProcessDefinitionLog insertProcessDefinitionLog(int operator) {
+        // insertOne
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
+        processDefinitionLog.setCode(199L);
+        processDefinitionLog.setName("def 1");
+        processDefinitionLog.setProjectCode(1L);
+        processDefinitionLog.setUserId(operator);
+        processDefinitionLog.setVersion(10);
+        processDefinitionLog.setUpdateTime(new Date());
+        processDefinitionLog.setCreateTime(new Date());
+        processDefinitionLog.setOperator(operator);
+        processDefinitionLogMapper.insert(processDefinitionLog);
+        return processDefinitionLog;
+    }
+
+    private ProcessDefinition insertProcessDefinition(int operator) {
+        // insertOne
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setCode(199L);
+        processDefinition.setName("process-name");
+        processDefinition.setProjectCode(1010L);
+        processDefinition.setVersion(10);
+        processDefinition.setUserId(operator);
+        processDefinition.setUpdateTime(new Date());
+        processDefinition.setCreateTime(new Date());
+        processDefinitionMapper.insert(processDefinition);
+        return processDefinition;
+    }
+
 }