Browse Source

Merge pull request #21 from apache/refactor-worker

TaskPriority refactor (#2097)
Tboy 5 years ago
parent
commit
b42357a16d

+ 2 - 5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import io.swagger.annotations.*;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -240,8 +239,7 @@ public class ProcessInstanceController extends BaseController{
             logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}",
                     loginUser.getUserName(), projectName, processInstanceId);
             // task queue
-            ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
-            Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
+            Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
             return returnDataList(result);
         }catch (Exception e){
             logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e);
@@ -370,7 +368,6 @@ public class ProcessInstanceController extends BaseController{
             logger.info("delete process instance by ids, login user:{}, project name:{}, process instance ids :{}",
                     loginUser.getUserName(), projectName, processInstanceIds);
             // task queue
-            ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
             Map<String, Object> result = new HashMap<>(5);
             List<String> deleteFailedIdList = new ArrayList<>();
             if(StringUtils.isNotEmpty(processInstanceIds)){
@@ -379,7 +376,7 @@ public class ProcessInstanceController extends BaseController{
                 for (String strProcessInstanceId:processInstanceIdArray) {
                     int processInstanceId = Integer.parseInt(strProcessInstanceId);
                     try {
-                        Map<String, Object> deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
+                        Map<String, Object> deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
                         if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){
                             deleteFailedIdList.add(strProcessInstanceId);
                             logger.error((String)deleteResult.get(Constants.MSG));

+ 2 - 5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java

@@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -318,9 +316,8 @@ public class DataAnalysisService extends BaseService{
             return result;
         }
 
-        ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
-        List<String> tasksQueueList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        List<String> tasksKillList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL);
+        List<String> tasksQueueList = new ArrayList<>();
+        List<String> tasksKillList = new ArrayList<>();
 
         Map<String,Integer> dataMap = new HashMap<>();
         if (loginUser.getUserType() == UserType.ADMIN_USER){

+ 1 - 51
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@@ -38,7 +38,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -404,8 +403,6 @@ public class ProcessInstanceService extends BaseDAGService {
             processInstance.setProcessInstanceJson(processInstanceJson);
             processInstance.setGlobalParams(globalParams);
         }
-//        int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
-//                globalParams, schedule, flag, locations, connects);
         int update = processService.updateProcessInstance(processInstance);
         int updateDefine = 1;
         if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
@@ -472,11 +469,10 @@ public class ProcessInstanceService extends BaseDAGService {
      * @param loginUser login user
      * @param projectName project name
      * @param processInstanceId process instance id
-     * @param tasksQueue task queue
      * @return delete result code
      */
     @Transactional(rollbackFor = Exception.class)
-    public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId, ITaskQueue tasksQueue) {
+    public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) {
 
         Map<String, Object> result = new HashMap<>(5);
         Project project = projectMapper.queryByName(projectName);
@@ -494,52 +490,6 @@ public class ProcessInstanceService extends BaseDAGService {
             return result;
         }
 
-        //process instance priority
-        int processInstancePriority = processInstance.getProcessInstancePriority().ordinal();
-        // delete zk queue
-        if (CollectionUtils.isNotEmpty(taskInstanceList)){
-            for (TaskInstance taskInstance : taskInstanceList){
-                // task instance priority
-                int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal();
-
-                StringBuilder nodeValueSb = new StringBuilder(100);
-                nodeValueSb.append(processInstancePriority)
-                        .append(UNDERLINE)
-                        .append(processInstanceId)
-                        .append(UNDERLINE)
-                        .append(taskInstancePriority)
-                        .append(UNDERLINE)
-                        .append(taskInstance.getId())
-                        .append(UNDERLINE);
-
-                int taskWorkerGroupId = processService.getTaskWorkerGroupId(taskInstance);
-                WorkerGroup workerGroup = workerGroupMapper.selectById(taskWorkerGroupId);
-
-                if(workerGroup == null){
-                    nodeValueSb.append(DEFAULT_WORKER_ID);
-                }else {
-
-                    String ips = workerGroup.getIpList();
-                    StringBuilder ipSb = new StringBuilder(100);
-                    String[] ipArray = ips.split(COMMA);
-
-                    for (String ip : ipArray) {
-                        long ipLong = IpUtils.ipToLong(ip);
-                        ipSb.append(ipLong).append(COMMA);
-                    }
-
-                    if(ipSb.length() > 0) {
-                        ipSb.deleteCharAt(ipSb.length() - 1);
-                    }
-                    nodeValueSb.append(ipSb);
-                }
-
-                logger.info("delete task queue node : {}",nodeValueSb.toString());
-                tasksQueue.removeNode(org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE, nodeValueSb.toString());
-
-            }
-        }
-
         // delete database cascade
         int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
         processService.deleteAllSubWorkProcessByParentId(processInstanceId);

+ 1 - 27
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java

@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.junit.After;
 import org.junit.Assert;
@@ -74,8 +73,7 @@ public class DataAnalysisServiceTest {
     @Mock
     TaskInstanceMapper taskInstanceMapper;
 
-    @Mock
-    ITaskQueue taskQueue;
+
 
     @Mock
     ProcessService processService;
@@ -183,30 +181,6 @@ public class DataAnalysisServiceTest {
 
     }
 
-    @Test
-    public void testCountQueueState(){
-
-        PowerMockito.mockStatic(TaskQueueFactory.class);
-        List<String>  taskQueueList = new ArrayList<>(1);
-        taskQueueList.add("1_0_1_1_-1");
-        List<String>  taskKillList = new ArrayList<>(1);
-        taskKillList.add("1-0");
-        PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE)).thenReturn(taskQueueList);
-        PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL)).thenReturn(taskKillList);
-        PowerMockito.when(TaskQueueFactory.getTaskQueueInstance()).thenReturn(taskQueue);
-        //checkProject false
-        Map<String, Object> result = dataAnalysisService.countQueueState(user,2);
-        Assert.assertTrue(result.isEmpty());
-
-        result = dataAnalysisService.countQueueState(user,1);
-        Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
-        //admin
-        user.setUserType(UserType.ADMIN_USER);
-        result = dataAnalysisService.countQueueState(user,1);
-        Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
-
-    }
-
     /**
      *  get list
      * @return

+ 1 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@@ -194,6 +194,7 @@ public class TaskInstance implements Serializable {
     /**
      * workerGroup
      */
+    @TableField(exist = false)
     private String workerGroup;
 
     public ProcessInstance getProcessInstance() {

+ 2 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java

@@ -132,7 +132,8 @@ public class TaskPriority {
      */
     public static TaskPriority of(String taskPriorityInfo){
         String[] parts = taskPriorityInfo.split(UNDERLINE);
-        if (parts.length != 4) {
+
+        if (parts.length != 5) {
             throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo));
         }
         TaskPriority taskPriority = new TaskPriority(

+ 4 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
@@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,7 +168,8 @@ public class MasterServer implements IStoppable {
             logger.error("start Quartz failed", e);
         }
 
-
+        TaskUpdateQueueConsumer taskUpdateQueueConsumer = SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
+        taskUpdateQueueConsumer.start();
         /**
          *  register hooks, which are called before the process exits
          */

+ 2 - 3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         this.cancel = false;
         this.taskInstance = taskInstance;
         this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-        this.taskUpdateQueue = new TaskUpdateQueueImpl();
+        this.taskUpdateQueue = SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
     }
 
     /**
@@ -180,8 +180,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
                     processInstance.getId(),
                     taskInstance.getProcessInstancePriority().getCode(),
                     taskInstance.getId(),
-                    taskInstance.getWorkerGroup());
-
+                    org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
             taskUpdateQueue.put(taskPriorityInfo);
             logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
             return true;