Bladeren bron

[Fix-9796] The task queue can be executed normally. (#9797)

* Task group queue run manually

* Test repetition.

* Reference package is not available*.

Co-authored-by: WangJPLeo <wangjipeng@whaleops.com>
WangJPLeo 3 jaren geleden
bovenliggende
commit
d2fe85d7da

+ 8 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@@ -103,4 +103,12 @@ public interface ExecutorService {
      * @return check result
      */
     boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition);
+
+    /**
+     * force start Task Instance
+     * @param loginUser
+     * @param queueId
+     * @return
+     */
+    Map<String, Object> forceStartTaskInstance(User loginUser, int queueId);
 }

+ 24 - 3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@@ -103,6 +103,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     @Autowired
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
+    @Autowired
+    private TaskGroupQueueMapper taskGroupQueueMapper;
+
     /**
      * execute process instance
      *
@@ -350,6 +353,24 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         return result;
     }
 
+    @Override
+    public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
+        Map<String, Object> result = new HashMap<>();
+        TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId);
+        // check process instance exist
+        ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
+        if (processInstance == null) {
+            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
+            return result;
+        }
+
+        // check master exists
+        if (!checkMasterExists(result)) {
+            return result;
+        }
+        return forceStart(processInstance, taskGroupQueue);
+    }
+
     /**
      * check tenant suitable
      *
@@ -445,16 +466,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @param processInstance process instance
      * @return update result
      */
-    private Map<String, Object> forceStartTaskInstance(ProcessInstance processInstance, int taskId) {
+    private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
         Map<String, Object> result = new HashMap<>();
-        TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(taskId);
         if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
             putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
             return result;
         }
+
         taskGroupQueue.setForceStart(Flag.YES.getCode());
         processService.updateTaskGroupQueue(taskGroupQueue);
-        processService.sendStartTask2Master(processInstance,taskId
+        processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId()
                 ,org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
         putMsg(result, Status.SUCCESS);
         return result;

+ 5 - 5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.api.service.impl;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
 import org.apache.dolphinscheduler.api.service.TaskGroupService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -59,6 +60,9 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private ExecutorService executorService;
+
     private static final Logger logger = LoggerFactory.getLogger(TaskGroupServiceImpl.class);
 
     /**
@@ -303,11 +307,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
      */
     @Override
     public Map<String, Object> forceStartTask(User loginUser, int queueId) {
-        Map<String, Object> result = new HashMap<>();
-
-        taskGroupQueueService.forceStartTask(queueId, Flag.YES.getCode());
-        putMsg(result, Status.SUCCESS);
-        return result;
+        return executorService.forceStartTaskInstance(loginUser, queueId);
     }
 
     @Override

+ 28 - 0
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@@ -31,18 +31,22 @@ import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -95,6 +99,12 @@ public class ExecutorServiceTest {
     @Mock
     private MonitorService monitorService;
 
+    @Mock
+    private TaskGroupQueueMapper taskGroupQueueMapper;
+
+    @Mock
+    private ProcessInstanceMapper processInstanceMapper;
+
     private int processDefinitionId = 1;
 
     private long processDefinitionCode = 1L;
@@ -105,10 +115,14 @@ public class ExecutorServiceTest {
 
     private int userId = 1;
 
+    private int taskQueueId = 1;
+
     private ProcessDefinition processDefinition = new ProcessDefinition();
 
     private ProcessInstance processInstance = new ProcessInstance();
 
+    private TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
+
     private User loginUser = new User();
 
     private long projectCode = 1L;
@@ -145,6 +159,11 @@ public class ExecutorServiceTest {
         project.setCode(projectCode);
         project.setName(projectName);
 
+        // taskGroupQueue
+        taskGroupQueue.setId(taskQueueId);
+        taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
+        taskGroupQueue.setProcessId(processInstanceId);
+
         // cronRangeTime
         cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00";
 
@@ -157,6 +176,15 @@ public class ExecutorServiceTest {
         Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
         Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
         Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
+        Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue);
+        Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance);
+    }
+
+    @Test
+    public void testForceStartTaskInstance(){
+
+        Map<String, Object> result = executorService.forceStartTaskInstance(loginUser, taskQueueId);
+        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 
     /**

+ 0 - 10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java

@@ -174,14 +174,4 @@ public class TaskGroupServiceTest {
         result = taskGroupService.startTaskGroup(loginUser, 1);
         Assert.assertEquals(Status.TASK_GROUP_STATUS_OPENED, result.get(Constants.STATUS));
     }
-
-    @Test
-    public void testWakeTaskFroceManually() {
-
-        TreeMap<Integer, Integer> tm = new TreeMap<>();
-        tm.put(1, 1);
-        Map<String, Object> map1 = taskGroupService.forceStartTask(getLoginUser(), 1);
-        Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS));
-
-    }
 }