|
@@ -20,18 +20,30 @@ package org.apache.dolphinscheduler.api.service;
|
|
|
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.RERUN;
|
|
|
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
+import static org.mockito.ArgumentMatchers.argThat;
|
|
|
+import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
|
|
|
import org.apache.dolphinscheduler.api.enums.ExecuteType;
|
|
|
import org.apache.dolphinscheduler.api.enums.Status;
|
|
|
+import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
|
|
|
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
|
|
|
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
|
|
|
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
|
|
|
import org.apache.dolphinscheduler.common.Constants;
|
|
|
-import org.apache.dolphinscheduler.common.enums.*;
|
|
|
+import org.apache.dolphinscheduler.common.enums.CommandType;
|
|
|
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
|
|
|
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
|
|
|
+import org.apache.dolphinscheduler.common.enums.Priority;
|
|
|
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
|
|
|
+import org.apache.dolphinscheduler.common.enums.RunMode;
|
|
|
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
|
|
|
+import org.apache.dolphinscheduler.common.enums.WarningType;
|
|
|
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
|
|
|
import org.apache.dolphinscheduler.common.model.Server;
|
|
|
import org.apache.dolphinscheduler.dao.entity.Command;
|
|
|
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
|
|
import org.apache.dolphinscheduler.dao.entity.Project;
|
|
@@ -45,17 +57,18 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
|
|
|
-import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
|
|
|
+import org.assertj.core.util.Lists;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -178,7 +191,8 @@ public class ExecutorServiceTest {
|
|
|
.thenReturn(checkProjectAndAuth());
|
|
|
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
|
|
|
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
|
|
|
- Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
|
|
|
+ doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null));
|
|
|
+ doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null));
|
|
|
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
|
|
|
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(Optional.ofNullable(processInstance));
|
|
|
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
|
|
@@ -237,6 +251,50 @@ public class ExecutorServiceTest {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testComplementWithDependentMode() {
|
|
|
+ Schedule schedule = new Schedule();
|
|
|
+ schedule.setStartTime(new Date());
|
|
|
+ schedule.setEndTime(new Date());
|
|
|
+ schedule.setCrontab("0 0 7 * * ? *");
|
|
|
+ schedule.setFailureStrategy(FailureStrategy.CONTINUE);
|
|
|
+ schedule.setReleaseState(ReleaseState.OFFLINE);
|
|
|
+ schedule.setWarningType(WarningType.NONE);
|
|
|
+ schedule.setCreateTime(new Date());
|
|
|
+ schedule.setUpdateTime(new Date());
|
|
|
+ List<Schedule> schedules = Lists.newArrayList(schedule);
|
|
|
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(
|
|
|
+ processDefinitionCode))
|
|
|
+ .thenReturn(schedules);
|
|
|
+
|
|
|
+ DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition();
|
|
|
+ dependentProcessDefinition.setProcessDefinitionCode(2);
|
|
|
+ dependentProcessDefinition.setProcessDefinitionVersion(1);
|
|
|
+ dependentProcessDefinition.setTaskDefinitionCode(1);
|
|
|
+ dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
|
|
|
+ dependentProcessDefinition.setTaskParams(
|
|
|
+ "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
|
|
|
+ Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
|
|
|
+ .thenReturn(Lists.newArrayList(dependentProcessDefinition));
|
|
|
+
|
|
|
+ Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
|
|
|
+ processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
|
|
|
+ Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
|
|
|
+ .thenReturn(processDefinitionWorkerGroupMap);
|
|
|
+
|
|
|
+ Command command = new Command();
|
|
|
+ command.setId(1);
|
|
|
+ command.setCommandType(CommandType.COMPLEMENT_DATA);
|
|
|
+ command.setCommandParam(
|
|
|
+ "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
|
|
|
+ command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
|
|
|
+ command.setProcessDefinitionCode(processDefinitionCode);
|
|
|
+ command.setExecutorId(1);
|
|
|
+
|
|
|
+ int count = executorService.createComplementDependentCommand(schedules, command);
|
|
|
+ Assert.assertEquals(1, count);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
* date error
|
|
|
*/
|