Browse Source

Modify Queue already add to User not change bug -- fix #1514 (#1517)

* add Spark Version in Spark Component

add Spark Version in Spark Component

* add license for SparkVersion.class

add license

* 1 add spark task UT
2 add spark version param check

* add assert check for sparkTaskTest

* fix bug:user queue field does not change with queue table modify

* modify check queue in using method name and updateUserByQueue method param name

modify check queue in using method name and updateUserByQueue method param name
Yelli 5 years ago
parent
commit
66217be990

+ 42 - 16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java

@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -47,6 +48,9 @@ public class QueueService extends BaseService {
     @Autowired
     private QueueMapper queueMapper;
 
+    @Autowired
+    private UserMapper userMapper;
+
     /**
      * query queue list
      *
@@ -70,9 +74,9 @@ public class QueueService extends BaseService {
      * query queue list paging
      *
      * @param loginUser login user
-     * @param pageNo page number
+     * @param pageNo    page number
      * @param searchVal search value
-     * @param pageSize page size
+     * @param pageSize  page size
      * @return queue list
      */
     public Map<String, Object> queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
@@ -86,7 +90,7 @@ public class QueueService extends BaseService {
 
         IPage<Queue> queueList = queueMapper.queryQueuePaging(page, searchVal);
 
-        Integer count = (int)queueList.getTotal();
+        Integer count = (int) queueList.getTotal();
         PageInfo<Queue> pageInfo = new PageInfo<>(pageNo, pageSize);
         pageInfo.setTotalCount(count);
         pageInfo.setLists(queueList.getRecords());
@@ -100,7 +104,7 @@ public class QueueService extends BaseService {
      * create queue
      *
      * @param loginUser login user
-     * @param queue queue
+     * @param queue     queue
      * @param queueName queue name
      * @return create result
      */
@@ -110,12 +114,12 @@ public class QueueService extends BaseService {
             return result;
         }
 
-        if(StringUtils.isEmpty(queue)){
+        if (StringUtils.isEmpty(queue)) {
             putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, queue);
             return result;
         }
 
-        if(StringUtils.isEmpty(queueName)){
+        if (StringUtils.isEmpty(queueName)) {
             putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, queueName);
             return result;
         }
@@ -148,8 +152,8 @@ public class QueueService extends BaseService {
      * update queue
      *
      * @param loginUser login user
-     * @param queue queue
-     * @param id queue id
+     * @param queue     queue
+     * @param id        queue id
      * @param queueName queue name
      * @return update result code
      */
@@ -173,7 +177,7 @@ public class QueueService extends BaseService {
 
         // check queue name is exist
         if (!queueName.equals(queueObj.getQueueName())) {
-            if(checkQueueNameExist(queueName)){
+            if (checkQueueNameExist(queueName)) {
                 putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
                 return result;
             }
@@ -181,12 +185,19 @@ public class QueueService extends BaseService {
 
         // check queue value is exist
         if (!queue.equals(queueObj.getQueue())) {
-            if(checkQueueExist(queue)){
+            if (checkQueueExist(queue)) {
                 putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
                 return result;
             }
         }
 
+        // check old queue using by any user
+        if (checkIfQueueIsInUsing(queueObj.getQueueName(), queueName)) {
+            //update user related old queue
+            Integer relatedUserNums = userMapper.updateUserQueue(queueObj.getQueueName(), queueName);
+            logger.info("old queue have related {} user, exec update user success.", relatedUserNums);
+        }
+
         // update queue
         Date now = new Date();
         queueObj.setQueue(queue);
@@ -194,6 +205,7 @@ public class QueueService extends BaseService {
         queueObj.setUpdateTime(now);
 
         queueMapper.updateById(queueObj);
+
         putMsg(result, Status.SUCCESS);
 
         return result;
@@ -202,12 +214,12 @@ public class QueueService extends BaseService {
     /**
      * verify queue and queueName
      *
-     * @param queue queue
+     * @param queue     queue
      * @param queueName queue name
      * @return true if the queue name not exists, otherwise return false
      */
     public Result verifyQueue(String queue, String queueName) {
-        Result result=new Result();
+        Result result = new Result();
 
         if (StringUtils.isEmpty(queue)) {
             putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, queue);
@@ -220,13 +232,13 @@ public class QueueService extends BaseService {
         }
 
 
-        if(checkQueueNameExist(queueName)){
+        if (checkQueueNameExist(queueName)) {
             logger.error("queue name {} has exist, can't create again.", queueName);
             putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
             return result;
         }
 
-        if(checkQueueExist(queue)){
+        if (checkQueueExist(queue)) {
             logger.error("queue value {} has exist, can't create again.", queue);
             putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
             return result;
@@ -240,21 +252,35 @@ public class QueueService extends BaseService {
      * check queue exist
      * if exists return true,not exists return false
      * check queue exist
+     *
      * @param queue queue
      * @return true if the queue not exists, otherwise return false
      */
     private boolean checkQueueExist(String queue) {
-        return queueMapper.queryAllQueueList(queue, null).size()>0 ? true : false;
+        return queueMapper.queryAllQueueList(queue, null).size() > 0;
     }
 
     /**
      * check queue name exist
      * if exists return true,not exists return false
+     *
      * @param queueName queue name
      * @return true if the queue name not exists, otherwise return false
      */
     private boolean checkQueueNameExist(String queueName) {
-        return queueMapper.queryAllQueueList(null ,queueName).size() > 0 ? true : false;
+        return queueMapper.queryAllQueueList(null, queueName).size() > 0;
+    }
+
+    /**
+     * check old queue name using by any user
+     * if need to update user
+     *
+     * @param oldQueue old queue name
+     * @param newQueue new queue name
+     * @return true if need to update user
+     */
+    private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) {
+        return !oldQueue.equals(newQueue) && userMapper.queryUserListByQueue(oldQueue).size() > 0;
     }
 
 }

+ 14 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java

@@ -95,4 +95,18 @@ public interface UserMapper extends BaseMapper<User> {
      */
     User queryUserByToken(@Param("token") String token);
 
+    /**
+     * query user by queue name
+     * @param queueName queue name
+     * @return user list
+     */
+    List<User> queryUserListByQueue(@Param("queueName") String queueName);
+
+    /**
+     * update user with old queue
+     * @param oldQueue old queue name
+     * @param newQueue new queue name
+     * @return update rows
+     */
+    Integer updateUserQueue(@Param("oldQueue") String oldQueue, @Param("newQueue") String newQueue);
 }

+ 10 - 0
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml

@@ -69,4 +69,14 @@
         from t_ds_user u ,t_ds_access_token t
         where u.id = t.user_id and token=#{token} and t.expire_time > NOW()
     </select>
+    <select id="queryUserListByQueue" resultType="org.apache.dolphinscheduler.dao.entity.User">
+      select *
+      from t_ds_user
+      where queue = #{queueName}
+    </select>
+    <update id="updateUserQueue" parameterType="java.lang.String">
+        update t_ds_user
+        set queue = #{newQueue}
+        where queue = #{oldQueue}
+    </update>
 </mapper>