123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755 |
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package cn.escheduler.dao;
- import cn.escheduler.common.Constants;
- import cn.escheduler.common.enums.*;
- import cn.escheduler.common.model.DateInterval;
- import cn.escheduler.common.model.TaskNode;
- import cn.escheduler.common.process.Property;
- import cn.escheduler.common.queue.ITaskQueue;
- import cn.escheduler.common.queue.TaskQueueFactory;
- import cn.escheduler.common.task.subprocess.SubProcessParameters;
- import cn.escheduler.common.utils.DateUtils;
- import cn.escheduler.common.utils.IpUtils;
- import cn.escheduler.common.utils.JSONUtils;
- import cn.escheduler.common.utils.ParameterUtils;
- import cn.escheduler.dao.mapper.*;
- import cn.escheduler.dao.model.*;
- import cn.escheduler.dao.utils.cron.CronUtils;
- import com.alibaba.fastjson.JSONObject;
- import com.cronutils.model.Cron;
- import org.apache.commons.lang3.ArrayUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.quartz.CronExpression;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- import java.util.*;
- import java.util.stream.Collectors;
- import static cn.escheduler.common.Constants.*;
- import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper;
- /**
- * process relative dao that some mappers in this.
- */
- @Component
- public class ProcessDao extends AbstractBaseDao {
- private final Logger logger = LoggerFactory.getLogger(getClass());
- private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.RUNNING_EXEUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- // ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal()};
- @Autowired
- private UserMapper userMapper;
- @Autowired
- private ProcessDefinitionMapper processDefineMapper;
- @Autowired
- private ProcessInstanceMapper processInstanceMapper;
- @Autowired
- private DataSourceMapper dataSourceMapper;
- @Autowired
- private ProcessInstanceMapMapper processInstanceMapMapper;
- @Autowired
- private TaskInstanceMapper taskInstanceMapper;
- @Autowired
- private CommandMapper commandMapper;
- @Autowired
- private ScheduleMapper scheduleMapper;
- @Autowired
- private UdfFuncMapper udfFuncMapper;
- @Autowired
- private ResourceMapper resourceMapper;
- @Autowired
- private WorkerGroupMapper workerGroupMapper;
- @Autowired
- private ErrorCommandMapper errorCommandMapper;
- @Autowired
- private WorkerServerMapper workerServerMapper;
- @Autowired
- private TenantMapper tenantMapper;
- /**
- * task queue impl
- */
- protected ITaskQueue taskQueue;
- public ProcessDao(){
- init();
- }
- /**
- * initialize
- */
- @Override
- protected void init() {
- userMapper = getMapper(UserMapper.class);
- processDefineMapper = getMapper(ProcessDefinitionMapper.class);
- processInstanceMapper = getMapper(ProcessInstanceMapper.class);
- dataSourceMapper = getMapper(DataSourceMapper.class);
- processInstanceMapMapper = getMapper(ProcessInstanceMapMapper.class);
- taskInstanceMapper = getMapper(TaskInstanceMapper.class);
- commandMapper = getMapper(CommandMapper.class);
- scheduleMapper = getMapper(ScheduleMapper.class);
- udfFuncMapper = getMapper(UdfFuncMapper.class);
- resourceMapper = getMapper(ResourceMapper.class);
- workerGroupMapper = getMapper(WorkerGroupMapper.class);
- workerServerMapper = getMapper(WorkerServerMapper.class);
- taskQueue = TaskQueueFactory.getTaskQueueInstance();
- tenantMapper = getMapper(TenantMapper.class);
- }
- /**
- * find one command from command queue, construct process instance
- * @param logger
- * @param host
- * @param validThreadNum
- * @return
- */
- @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
- public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){
- ProcessInstance processInstance = null;
- Command command = findOneCommand();
- if (command == null) {
- return null;
- }
- logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
- try{
- processInstance = constructProcessInstance(command, host);
- //cannot construct process instance, return null;
- if(processInstance == null){
- logger.error("scan command, command parameter is error: %s", command.toString());
- delCommandByid(command.getId());
- saveErrorCommand(command, "process instance is null");
- return null;
- }else if(!checkThreadNum(command, validThreadNum)){
- logger.info("there is not enough thread for this command: {}",command.toString() );
- return setWaitingThreadProcess(command, processInstance);
- }else{
- processInstance.setCommandType(command.getCommandType());
- processInstance.addHistoryCmd(command.getCommandType());
- saveProcessInstance(processInstance);
- this.setSubProcessParam(processInstance);
- delCommandByid(command.getId());
- return processInstance;
- }
- }catch (Exception e){
- logger.error("scan command error ", e);
- saveErrorCommand(command, e.toString());
- delCommandByid(command.getId());
- }
- return null;
- }
- private void saveErrorCommand(Command command, String message) {
- ErrorCommand errorCommand = new ErrorCommand(command, message);
- this.errorCommandMapper.insert(errorCommand);
- }
- /**
- * set process waiting thread
- * @param command
- * @param processInstance
- * @return
- */
- private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) {
- processInstance.setState(ExecutionStatus.WAITTING_THREAD);
- if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){
- processInstance.addHistoryCmd(command.getCommandType());
- }
- saveProcessInstance(processInstance);
- this.setSubProcessParam(processInstance);
- createRecoveryWaitingThreadCommand(command, processInstance);
- return null;
- }
- private boolean checkThreadNum(Command command, int validThreadNum) {
- int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
- return validThreadNum >= commandThreadCount;
- }
- /**
- * insert one command
- */
- public int createCommand(Command command) {
- int result = 0;
- if (command != null){
- result = commandMapper.insert(command);
- }
- return result;
- }
- /**
- *
- * find one command from queue list
- * @return
- */
- public Command findOneCommand(){
- return commandMapper.queryOneCommand();
- }
- /**
- * check the input command exists in queue list
- * @param command
- * @return
- */
- public Boolean verifyIsNeedCreateCommand(Command command){
- Boolean isNeedCreate = true;
- Map<CommandType,Integer> cmdTypeMap = new HashMap<CommandType,Integer>();
- cmdTypeMap.put(CommandType.REPEAT_RUNNING,1);
- cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS,1);
- cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS,1);
- CommandType commandType = command.getCommandType();
- if(cmdTypeMap.containsKey(commandType)){
- JSONObject cmdParamObj = (JSONObject) JSONObject.parse(command.getCommandParam());
- JSONObject tempObj;
- int processInstanceId = cmdParamObj.getInteger(CMDPARAM_RECOVER_PROCESS_ID_STRING);
- List<Command> commands = commandMapper.queryAllCommand();
- //遍历所有命令
- for (Command tmpCommand:commands){
- if(cmdTypeMap.containsKey(tmpCommand.getCommandType())){
- tempObj = (JSONObject) JSONObject.parse(tmpCommand.getCommandParam());
- if(tempObj != null && processInstanceId == tempObj.getInteger(CMDPARAM_RECOVER_PROCESS_ID_STRING)){
- isNeedCreate = false;
- break;
- }
- }
- }
- }
- return isNeedCreate;
- }
- /**
- * find process instance detail by id
- * @param processId
- * @return
- */
- public ProcessInstance findProcessInstanceDetailById(int processId){
- return processInstanceMapper.queryDetailById(processId);
- }
- /**
- * find process instance by id
- * @param processId
- * @return
- */
- public ProcessInstance findProcessInstanceById(int processId){
- return processInstanceMapper.queryById(processId);
- }
- /**
- * find process instance by scheduler time.
- * @param defineId
- * @param scheduleTime
- * @return
- */
- public ProcessInstance findProcessInstanceByScheduleTime(int defineId, Date scheduleTime){
- return processInstanceMapper.queryByScheduleTime(defineId,
- DateUtils.dateToString(scheduleTime), 0, null, null);
- }
- /**
- * find process define by id.
- * @param processDefinitionId
- * @return
- */
- public ProcessDefinition findProcessDefineById(int processDefinitionId) {
- return processDefineMapper.queryByDefineId(processDefinitionId);
- }
- /**
- * delete work process instance by id
- * @param processInstanceId
- * @return
- */
- public int deleteWorkProcessInstanceById(int processInstanceId){
- return processInstanceMapper.delete(processInstanceId);
- }
- /**
- *
- * delete all sub process by parent instance id
- * @return
- */
- public int deleteAllSubWorkProcessByParentId(int processInstanceId){
- List<Integer> subProcessIdList = processInstanceMapper.querySubIdListByParentId(processInstanceId);
- for(Integer subId : subProcessIdList ){
- deleteAllSubWorkProcessByParentId(subId);
- deleteWorkProcessMapByParentId(subId);
- deleteWorkProcessInstanceById(subId);
- }
- return 1;
- }
- /**
- * create process define
- * @param processDefinition
- * @return
- */
- public int createProcessDefine(ProcessDefinition processDefinition){
- int count = 0;
- if(processDefinition != null){
- count = this.processDefineMapper.insert(processDefinition);
- }
- return count;
- }
- /**
- * calculate sub process number in the process define.
- * @param processDefinitionId
- * @return
- */
- private Integer workProcessThreadNumCount(Integer processDefinitionId){
- List<String> ids = new ArrayList<>();
- recurseFindSubProcessId(processDefinitionId, ids);
- return ids.size()+1;
- }
- /**
- * recursive query sub process definition id by parent id.
- * @param parentId
- * @param ids
- */
- public void recurseFindSubProcessId(int parentId, List<String> ids){
- ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(parentId);
- String processDefinitionJson = processDefinition.getProcessDefinitionJson();
- ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
- List<TaskNode> taskNodeList = processData.getTasks();
- if (taskNodeList != null && taskNodeList.size() > 0){
- for (TaskNode taskNode : taskNodeList){
- String parameter = taskNode.getParams();
- if (parameter.contains(CMDPARAM_SUB_PROCESS_DEFINE_ID)){
- SubProcessParameters subProcessParam = JSONObject.parseObject(parameter, SubProcessParameters.class);
- ids.add(String.valueOf(subProcessParam.getProcessDefinitionId()));
- recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(),ids);
- }
- }
- }
- }
- /**
- * create recovery waiting thread command when thread pool is not enough for the process instance.
- * sub work process instance need not to create recovery command.
- * create recovery waiting thread command and delete origin command at the same time.
- * if the recovery command is exists, only update the field update_time
- * @param originCommand
- * @param processInstance
- */
- public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
- // sub process doesnot need to create wait command
- if(processInstance.getIsSubProcess() == Flag.YES){
- if(originCommand != null){
- commandMapper.delete(originCommand.getId());
- }
- return;
- }
- Map<String, String> cmdParam = new HashMap<>();
- cmdParam.put(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD, String.valueOf(processInstance.getId()));
- // process instance quit by "waiting thread" state
- if(originCommand == null){
- Command command = new Command(
- CommandType.RECOVER_WAITTING_THREAD,
- processInstance.getTaskDependType(),
- processInstance.getFailureStrategy(),
- processInstance.getExecutorId(),
- processInstance.getProcessDefinitionId(),
- JSONUtils.toJson(cmdParam),
- processInstance.getWarningType(),
- processInstance.getWarningGroupId(),
- processInstance.getScheduleTime(),
- processInstance.getProcessInstancePriority()
- );
- saveCommand(command);
- return ;
- }
- // update the command time if current command if recover from waiting
- if(originCommand.getCommandType() == CommandType.RECOVER_WAITTING_THREAD){
- originCommand.setUpdateTime(new Date());
- saveCommand(originCommand);
- }else{
- // delete old command and create new waiting thread command
- commandMapper.delete(originCommand.getId());
- originCommand.setId(0);
- originCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
- originCommand.setUpdateTime(new Date());
- originCommand.setCommandParam(JSONUtils.toJson(cmdParam));
- originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- saveCommand(originCommand);
- }
- }
- /**
- * get schedule time from command
- * @param command
- * @param cmdParam
- * @return
- */
- private Date getScheduleTime(Command command, Map<String, String> cmdParam){
- Date scheduleTime = command.getScheduleTime();
- if(scheduleTime == null){
- if(cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
- scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
- }
- }
- return scheduleTime;
- }
- /**
- * generate a new work process instance from command.
- * @param processDefinition
- * @param command
- * @param cmdParam
- * @return
- */
- private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
- Command command,
- Map<String, String> cmdParam){
- ProcessInstance processInstance = new ProcessInstance(processDefinition);
- processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
- processInstance.setRecovery(Flag.NO);
- processInstance.setStartTime(new Date());
- processInstance.setRunTimes(1);
- processInstance.setMaxTryTimes(0);
- processInstance.setProcessDefinitionId(command.getProcessDefinitionId());
- processInstance.setCommandParam(command.getCommandParam());
- processInstance.setCommandType(command.getCommandType());
- processInstance.setIsSubProcess(Flag.NO);
- processInstance.setTaskDependType(command.getTaskDependType());
- processInstance.setFailureStrategy(command.getFailureStrategy());
- processInstance.setExecutorId(command.getExecutorId());
- WarningType warningType = command.getWarningType() == null ? WarningType.NONE : command.getWarningType();
- processInstance.setWarningType(warningType);
- Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
- processInstance.setWarningGroupId(warningGroupId);
- // schedule time
- Date scheduleTime = getScheduleTime(command, cmdParam);
- if(scheduleTime != null){
- processInstance.setScheduleTime(scheduleTime);
- }
- processInstance.setCommandStartTime(command.getStartTime());
- processInstance.setLocations(processDefinition.getLocations());
- processInstance.setConnects(processDefinition.getConnects());
- // curing global params
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
- //copy process define json to process instance
- processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
- // set process instance priority
- processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
- int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId();
- processInstance.setWorkerGroupId(workerGroupId);
- processInstance.setTimeout(processDefinition.getTimeout());
- processInstance.setTenantId(processDefinition.getTenantId());
- return processInstance;
- }
- /**
- * get process tenant
- * there is tenant id in definition, use the tenant of the definition.
- * if there is not tenant id in the definiton or the tenant not exist
- * use definition creator's tenant.
- * @param tenantId
- * @param userId
- * @return
- */
- public Tenant getTenantForProcess(int tenantId, int userId){
- Tenant tenant = null;
- if(tenantId >= 0){
- tenant = tenantMapper.queryById(tenantId);
- }
- if(tenant == null){
- User user = userMapper.queryById(userId);
- tenant = tenantMapper.queryById(user.getTenantId());
- }
- return tenant;
- }
- /**
- * check command parameters is valid
- * @param command
- * @param cmdParam
- * @return
- */
- private Boolean checkCmdParam(Command command, Map<String, String> cmdParam){
- if(command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType()== TaskDependType.TASK_PRE){
- if(cmdParam == null
- || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES)
- || cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()){
- logger.error(String.format("command node depend type is %s, but start nodes is null ", command.getTaskDependType().toString()));
- return false;
- }
- }
- return true;
- }
- /**
- * construct process instance according to one command.
- * @param command
- * @param host
- * @return
- */
- private ProcessInstance constructProcessInstance(Command command, String host){
- ProcessInstance processInstance = null;
- CommandType commandType = command.getCommandType();
- Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
- ProcessDefinition processDefinition = null;
- if(command.getProcessDefinitionId() != 0){
- processDefinition = processDefineMapper.queryByDefineId(command.getProcessDefinitionId());
- if(processDefinition == null){
- logger.error(String.format("cannot find the work process define! define id : %d", command.getProcessDefinitionId()));
- return null;
- }
- }
- if(cmdParam != null ){
- Integer processInstanceId = 0;
- // recover from failure or pause tasks
- if(cmdParam.containsKey(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING)) {
- String processId = cmdParam.get(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING);
- processInstanceId = Integer.parseInt(processId);
- if (processInstanceId == 0) {
- logger.error("command parameter is error, [ ProcessInstanceId ] is 0");
- return null;
- }
- }else if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
- // sub process map
- String pId = cmdParam.get(Constants.CMDPARAM_SUB_PROCESS);
- processInstanceId = Integer.parseInt(pId);
- }else if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD)){
- // waiting thread command
- String pId = cmdParam.get(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD);
- processInstanceId = Integer.parseInt(pId);
- }
- if(processInstanceId ==0){
- processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
- }else{
- processInstance = this.findProcessInstanceDetailById(processInstanceId);
- }
- processDefinition = processDefineMapper.queryByDefineId(processInstance.getProcessDefinitionId());
- processInstance.setProcessDefinition(processDefinition);
- //reset command parameter
- if(processInstance.getCommandParam() != null){
- Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
- for(String key : processCmdParam.keySet()){
- if(!cmdParam.containsKey(key)){
- cmdParam.put(key,processCmdParam.get(key));
- }
- }
- }
- // reset command parameter if sub process
- if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
- processInstance.setCommandParam(command.getCommandParam());
- }
- }else{
- // generate one new process instance
- processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
- }
- if(!checkCmdParam(command, cmdParam)){
- logger.error("command parameter check failed!");
- return null;
- }
- if(command.getScheduleTime() != null){
- processInstance.setScheduleTime(command.getScheduleTime());
- }
- processInstance.setHost(host);
- ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXEUTION;
- int runTime = processInstance.getRunTimes();
- switch (commandType){
- case START_PROCESS:
- break;
- case START_FAILURE_TASK_PROCESS:
- // find failed tasks and init these tasks
- List<Integer> failedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
- List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE);
- List<Integer> killedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
- cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
- failedList.addAll(killedList);
- failedList.addAll(toleranceList);
- for(Integer taskId : failedList){
- initTaskInstance(this.findTaskInstanceById(taskId));
- }
- cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
- String.join(Constants.COMMA, convertIntListToString(failedList)));
- processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
- processInstance.setRunTimes(runTime +1 );
- break;
- case START_CURRENT_TASK_PROCESS:
- break;
- case RECOVER_WAITTING_THREAD:
- break;
- case RECOVER_SUSPENDED_PROCESS:
- // find pause tasks and init task's state
- cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
- List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
- List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
- ExecutionStatus.KILL);
- suspendedNodeList.addAll(stopNodeList);
- for(Integer taskId : suspendedNodeList){
- // 把暂停状态初始化
- initTaskInstance(this.findTaskInstanceById(taskId));
- }
- cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(suspendedNodeList)));
- processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
- processInstance.setRunTimes(runTime +1);
- break;
- case RECOVER_TOLERANCE_FAULT_PROCESS:
- // recover tolerance fault process
- processInstance.setRecovery(Flag.YES);
- runStatus = processInstance.getState();
- break;
- case COMPLEMENT_DATA:
- // delete all the valid tasks when complement data
- List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
- for(TaskInstance taskInstance : taskInstanceList){
- taskInstance.setFlag(Flag.NO);
- this.updateTaskInstance(taskInstance);
- }
- break;
- case REPEAT_RUNNING:
- // delete the recover task names from command parameter
- if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
- cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
- processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
- }
- // delete all the valid tasks when repeat running
- List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
- for(TaskInstance taskInstance : validTaskList){
- taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
- }
- processInstance.setStartTime(new Date());
- processInstance.setEndTime(null);
- processInstance.setRunTimes(runTime +1);
- initComplementDataParam(processDefinition, processInstance, cmdParam);
- break;
- case SCHEDULER:
- break;
- default:
- break;
- }
- processInstance.setState(runStatus);
- return processInstance;
- }
- /**
- * return complement data if the process start with complement data
- */
- private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command){
- if(CommandType.COMPLEMENT_DATA == processInstance.getCmdTypeIfComplement()){
- return CommandType.COMPLEMENT_DATA;
- }else{
- return command.getCommandType();
- }
- }
- /**
- * initialize complement data parameters
- * @param processDefinition
- * @param processInstance
- * @param cmdParam
- */
- private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, Map<String, String> cmdParam) {
- if(!processInstance.isComplementData()){
- return;
- }
- Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
- YYYY_MM_DD_HH_MM_SS);
- processInstance.setScheduleTime(startComplementTime);
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
- }
- /**
- * set sub work process parameters.
- * handle sub work process instance, update relation table and command parameters
- * set sub work process flag, extends parent work process command parameters.
- */
- public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance){
- String cmdParam = subProcessInstance.getCommandParam();
- if(StringUtils.isEmpty(cmdParam)){
- return subProcessInstance;
- }
- Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
- // write sub process id into cmd param.
- if(paramMap.containsKey(CMDPARAM_SUB_PROCESS)
- && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){
- paramMap.remove(CMDPARAM_SUB_PROCESS);
- paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
- subProcessInstance.setCommandParam(JSONUtils.toJson(paramMap));
- subProcessInstance.setIsSubProcess(Flag.YES);
- this.saveProcessInstance(subProcessInstance);
- }
- // copy parent instance user def params to sub process..
- String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
- if(StringUtils.isNotEmpty(parentInstanceId)){
- ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
- if(parentInstance != null){
- subProcessInstance.setGlobalParams(
- joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
- this.saveProcessInstance(subProcessInstance);
- }else{
- logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
- }
- }
- ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
- if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){
- return subProcessInstance;
- }
- // update sub process id to process map table
- processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
- this.updateWorkProcessInstanceMap(processInstanceMap);
- return subProcessInstance;
- }
- /**
- * join parent global params into sub process.
- * only the keys doesn't in sub process global would be joined.
- * @param parentGlobalParams
- * @param subGlobalParams
- * @return
- */
- private String joinGlobalParams(String parentGlobalParams, String subGlobalParams){
- List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class);
- List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class);
- Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
- for(Property parent : parentPropertyList){
- if(!subMap.containsKey(parent.getProp())){
- subPropertyList.add(parent);
- }
- }
- return JSONUtils.toJson(subPropertyList);
- }
- /**
- * initialize task instance
- * @param taskInstance
- */
- private void initTaskInstance(TaskInstance taskInstance){
- if(!taskInstance.isSubProcess()){
- if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){
- taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
- return;
- }
- }
- taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- updateTaskInstance(taskInstance);
- }
- /**
- * submit task to mysql and task queue
- * submit sub process to command
- * @param taskInstance
- * @return
- */
- @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
- public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
- logger.info("start submit task : {}, instance id:{}, state: {}, ",
- taskInstance.getName(), processInstance.getId(), processInstance.getState() );
- processInstance = this.findProcessInstanceDetailById(processInstance.getId());
- //submit to mysql
- TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
- if(task.isSubProcess() && !task.getState().typeIsFinished()){
- ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
- TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
- Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
- Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
- createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
- }else if(!task.getState().typeIsFinished()){
- //submit to task queue
- task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- submitTaskToQueue(task);
- }
- logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",
- taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
- return task;
- }
- /**
- * set work process instance map
- * @param parentInstance
- * @param parentTask
- * @return
- */
- private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){
- ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
- if(processMap != null){
- return processMap;
- }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING
- || parentInstance.isComplementData()){
- // update current task id to map
- // repeat running does not generate new sub process instance
- processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
- if(processMap!= null){
- processMap.setParentTaskInstanceId(parentTask.getId());
- updateWorkProcessInstanceMap(processMap);
- return processMap;
- }
- }
- // new task
- processMap = new ProcessInstanceMap();
- processMap.setParentProcessInstanceId(parentInstance.getId());
- processMap.setParentTaskInstanceId(parentTask.getId());
- createWorkProcessInstanceMap(processMap);
- return processMap;
- }
- /**
- * find previous task work process map.
- * @param parentProcessInstance
- * @param parentTask
- * @return
- */
- private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
- TaskInstance parentTask) {
- Integer preTaskId = 0;
- List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
- for(TaskInstance task : preTaskList){
- if(task.getName().equals(parentTask.getName())){
- preTaskId = task.getId();
- ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
- if(map!=null){
- return map;
- }
- }
- }
- logger.info("sub process instance is not found,parent task:{},parent instance:{}",
- parentTask.getId(), parentProcessInstance.getId());
- return null;
- }
- /**
- * create sub work process command
- * @param parentProcessInstance
- * @param instanceMap
- * @param childDefineId
- * @param task
- */
- private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
- ProcessInstanceMap instanceMap,
- Integer childDefineId, TaskInstance task){
- ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
- CommandType fatherType = parentProcessInstance.getCommandType();
- CommandType commandType = fatherType;
- if(childInstance == null || commandType == CommandType.REPEAT_RUNNING){
- String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
- // sub process must begin with schedule/complement data
- // if father begin with scheduler/complement data
- if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
- fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
- commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
- }
- }
- if(childInstance != null){
- childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- updateProcessInstance(childInstance);
- }
- // set sub work process command
- String processMapStr = JSONUtils.toJson(instanceMap);
- Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
- if(commandType == CommandType.COMPLEMENT_DATA ||
- (childInstance != null && childInstance.isComplementData())){
- Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
- String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
- String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
- processMapStr = JSONUtils.toJson(cmdParam);
- }
- Command command = new Command();
- command.setWarningType(parentProcessInstance.getWarningType());
- command.setWarningGroupId(parentProcessInstance.getWarningGroupId());
- command.setFailureStrategy(parentProcessInstance.getFailureStrategy());
- command.setProcessDefinitionId(childDefineId);
- command.setScheduleTime(parentProcessInstance.getScheduleTime());
- command.setExecutorId(parentProcessInstance.getExecutorId());
- command.setCommandParam(processMapStr);
- command.setCommandType(commandType);
- command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
- createCommand(command);
- logger.info("sub process command created: {} ", command.toString());
- }
- /**
- * submit task to mysql
- * @param taskInstance
- * @return
- */
- public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){
- ExecutionStatus processInstanceState = processInstance.getState();
- if(taskInstance.getState().typeIsFailure()){
- if(taskInstance.isSubProcess()){
- taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
- }else {
- if( processInstanceState != ExecutionStatus.READY_STOP
- && processInstanceState != ExecutionStatus.READY_PAUSE){
- // failure task set invalid
- taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
- // crate new task instance
- if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){
- taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
- }
- taskInstance.setEndTime(null);
- taskInstance.setStartTime(new Date());
- taskInstance.setFlag(Flag.YES);
- taskInstance.setHost(null);
- taskInstance.setId(0);
- }
- }
- }
- taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
- taskInstance.setSubmitTime(new Date());
- saveTaskInstance(taskInstance);
- return taskInstance;
- }
- /**
- * submit task to queue
- * @param task
- */
- public Boolean submitTaskToQueue(TaskInstance task) {
- try{
- // task cannot submit when running
- if(task.getState() == ExecutionStatus.RUNNING_EXEUTION){
- logger.info(String.format("submit to task queue, but task [%s] state already be running. ", task.getName()));
- return true;
- }
- if(checkTaskExistsInTaskQueue(task)){
- logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", task.getName()));
- return true;
- }
- logger.info("task ready to queue: {}" , task);
- taskQueue.add(SCHEDULER_TASKS_QUEUE, taskZkInfo(task));
- logger.info(String.format("master insert into queue success, task : %s", task.getName()) );
- return true;
- }catch (Exception e){
- logger.error("submit task to queue Exception: ", e);
- logger.error("task queue error : %s", JSONUtils.toJson(task));
- return false;
- }
- }
- /**
- * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${task executed by ip1},${ip2}...
- *
- * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
- *
- * 流程实例优先级_流程实例id_任务优先级_任务id_任务执行机器ip1,ip2... high <- low
- *
- * @param taskInstance
- * @return
- */
- private String taskZkInfo(TaskInstance taskInstance) {
- int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
- StringBuilder sb = new StringBuilder(100);
- sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
- .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
- .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
- .append(taskInstance.getId()).append(Constants.UNDERLINE);
- if(taskWorkerGroupId > 0){
- //not to find data from db
- WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
- if(workerGroup == null ){
- logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
- sb.append(Constants.DEFAULT_WORKER_ID);
- return sb.toString();
- }
- String ips = workerGroup.getIpList();
- if(StringUtils.isBlank(ips)){
- logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
- taskInstance.getId(), workerGroup.getId());
- sb.append(Constants.DEFAULT_WORKER_ID);
- return sb.toString();
- }
- StringBuilder ipSb = new StringBuilder(100);
- String[] ipArray = ips.split(COMMA);
- for (String ip : ipArray) {
- long ipLong = IpUtils.ipToLong(ip);
- ipSb.append(ipLong).append(COMMA);
- }
- if(ipSb.length() > 0) {
- ipSb.deleteCharAt(ipSb.length() - 1);
- }
- sb.append(ipSb);
- }else{
- sb.append(Constants.DEFAULT_WORKER_ID);
- }
- return sb.toString();
- }
- /**
- * get submit task instance state by the work process state
- * cannot modify the task state when running/kill/submit success, or this
- * task instance is already exists in task queue .
- * return pause if work process state is ready pause
- * return stop if work process state is ready stop
- * if all of above are not satisfied, return submit success
- *
- * @param taskInstance
- * @param processInstanceState
- * @return
- */
- public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){
- ExecutionStatus state = taskInstance.getState();
- if(
- // running or killed
- // the task already exists in task queue
- // return state
- state == ExecutionStatus.RUNNING_EXEUTION
- || state == ExecutionStatus.KILL
- || checkTaskExistsInTaskQueue(taskInstance)
- ){
- return state;
- }
- //return pasue /stop if process instance state is ready pause / stop
- // or return submit success
- if( processInstanceState == ExecutionStatus.READY_PAUSE){
- state = ExecutionStatus.PAUSE;
- }else if(processInstanceState == ExecutionStatus.READY_STOP) {
- state = ExecutionStatus.KILL;
- }else{
- state = ExecutionStatus.SUBMITTED_SUCCESS;
- }
- return state;
- }
- /**
- * check the task instance existing in queue
- * @return
- */
- public boolean checkTaskExistsInTaskQueue(TaskInstance task){
- if(task.isSubProcess()){
- return false;
- }
- String taskZkInfo = taskZkInfo(task);
- return taskQueue.checkTaskExists(SCHEDULER_TASKS_QUEUE, taskZkInfo);
- }
- /**
- * create a new process instance
- * @param processInstance
- */
- public void createProcessInstance(ProcessInstance processInstance){
- if (processInstance != null){
- processInstanceMapper.insert(processInstance);
- }
- }
- /**
- * insert or update work process instance to data base
- * @param workProcessInstance
- */
- public void saveProcessInstance(ProcessInstance workProcessInstance){
- if (workProcessInstance == null){
- logger.error("save error, process instance is null!");
- return ;
- }
- //创建流程实例
- if(workProcessInstance.getId() != 0){
- processInstanceMapper.update(workProcessInstance);
- }else{
- createProcessInstance(workProcessInstance);
- }
- }
- /**
- * insert or update command
- * @param command
- * @return
- */
- public int saveCommand(Command command){
- if(command.getId() != 0){
- return commandMapper.update(command);
- }else{
- return commandMapper.insert(command);
- }
- }
- /**
- * insert or update task instance
- * @param taskInstance
- * @return
- */
- public boolean saveTaskInstance(TaskInstance taskInstance){
- if(taskInstance.getId() != 0){
- return updateTaskInstance(taskInstance);
- }else{
- return createTaskInstance(taskInstance);
- }
- }
- /**
- * insert task instance
- * @param taskInstance
- * @return
- */
- public boolean createTaskInstance(TaskInstance taskInstance) {
- int count = taskInstanceMapper.insert(taskInstance);
- return count > 0;
- }
- /**
- * update task instance
- * @param taskInstance
- * @return
- */
- public boolean updateTaskInstance(TaskInstance taskInstance){
- int count = taskInstanceMapper.update(taskInstance);
- return count > 0;
- }
- /**
- * delete a command by id
- * @param id
- */
- public void delCommandByid(int id) {
- commandMapper.delete(id);
- }
- public TaskInstance findTaskInstanceById(Integer taskId){
- return taskInstanceMapper.queryById(taskId);
- }
- /**
- * get id list by task state
- * @param instanceId
- * @param state
- * @return
- */
- public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state){
- return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
- }
- /**
- *
- * find valid task list by process definition id
- * @param processInstanceId
- * @return
- */
- public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId){
- return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
- }
- /**
- * find previous task list by work process id
- * @param workProcessInstanceId
- * @return
- */
- public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer workProcessInstanceId){
- return taskInstanceMapper.findValidTaskListByProcessId(workProcessInstanceId, Flag.NO);
- }
- /**
- * update work process instance map
- * @param processInstanceMap
- * @return
- */
- public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
- return processInstanceMapMapper.update(processInstanceMap);
- }
- /**
- * create work process instance map
- * @param processInstanceMap
- * @return
- */
- public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
- Integer count = 0;
- if(processInstanceMap !=null){
- return processInstanceMapMapper.insert(processInstanceMap);
- }
- return count;
- }
- /**
- * find work process map by parent process id and parent task id.
- * @param parentWorkProcessId
- * @param parentTaskId
- * @return
- */
- public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId){
- return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
- }
- /**
- * delete work process map by parent process id
- * @param parentWorkProcessId
- * @return
- */
- public int deleteWorkProcessMapByParentId(int parentWorkProcessId){
- return processInstanceMapMapper.deleteByParentProcessId(parentWorkProcessId);
- }
- public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId){
- ProcessInstance processInstance = null;
- ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryByParentId(parentProcessId, parentTaskId);
- if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
- return processInstance;
- }
- processInstance = findProcessInstanceById(processInstanceMap.getProcessInstanceId());
- return processInstance;
- }
- public ProcessInstance findParentProcessInstance(Integer subProcessId) {
- ProcessInstance processInstance = null;
- ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(subProcessId);
- if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
- return processInstance;
- }
- processInstance = findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
- return processInstance;
- }
- /**
- * change task state
- * @param state
- * @param startTime
- * @param host
- * @param executePath
- */
- public void changeTaskState(ExecutionStatus state, Date startTime, String host,
- String executePath,
- String logPath,
- int taskInstId) {
- TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
- taskInstance.setState(state);
- taskInstance.setStartTime(startTime);
- taskInstance.setHost(host);
- taskInstance.setExecutePath(executePath);
- taskInstance.setLogPath(logPath);
- saveTaskInstance(taskInstance);
- }
- /**
- * update process instance
- * @param instance
- * @return
- */
- public int updateProcessInstance(ProcessInstance instance){
- return processInstanceMapper.update(instance);
- }
- /**
- * update the process instance
- * @param processInstanceId
- * @param processJson
- * @param globalParams
- * @param scheduleTime
- * @param flag
- * @param locations
- * @param connects
- * @return
- */
- public int updateProcessInstance(Integer processInstanceId, String processJson,
- String globalParams, Date scheduleTime, Flag flag,
- String locations, String connects){
- return processInstanceMapper.updateProcessInstance(processInstanceId, processJson,
- globalParams, scheduleTime, locations, connects, flag);
- }
- /**
- * change task state
- * @param state
- * @param endTime
- */
- public void changeTaskState(ExecutionStatus state,
- Date endTime,
- int taskInstId) {
- TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
- taskInstance.setState(state);
- taskInstance.setEndTime(endTime);
- saveTaskInstance(taskInstance);
- }
- /**
- * convert integer list to string list
- * @param intList
- * @return
- */
- public List<String> convertIntListToString(List<Integer> intList){
- if(intList == null){
- return new ArrayList<>();
- }
- List<String> result = new ArrayList<String>(intList.size());
- for(Integer intVar : intList){
- result.add(String.valueOf(intVar));
- }
- return result;
- }
- /**
- * set task
- * 根据任务实例id设置pid
- * @param taskInstId
- * @param pid
- */
- public void updatePidByTaskInstId(int taskInstId, int pid) {
- TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
- taskInstance.setPid(pid);
- taskInstance.setAppLink("");
- saveTaskInstance(taskInstance);
- }
- /**
- * update pid and app links field by task instance id
- * @param taskInstId
- * @param pid
- */
- public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {
- TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
- taskInstance.setPid(pid);
- taskInstance.setAppLink(appLinks);
- saveTaskInstance(taskInstance);
- }
- /**
- * query ProcessDefinition by name
- *
- * @see ProcessDefinition
- */
- public ProcessDefinition findProcessDefineByName(int projectId, String name) {
- ProcessDefinition projectFlow = processDefineMapper.queryByDefineName(projectId, name);
- return projectFlow;
- }
- /**
- * query Schedule <p>
- *
- * @see Schedule
- */
- public Schedule querySchedule(int id) {
- return scheduleMapper.queryById(id);
- }
- public List<ProcessInstance> queryNeedFailoverProcessInstances(String host){
- return processInstanceMapper.queryByHostAndStatus(host, stateArray);
- }
- /**
- * update host null
- * @param host
- * @return
- */
- public int updateNeddFailoverProcessInstances(String host){
- return processInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
- }
- /**
- * process need failover process instance
- * @param processInstance
- */
- @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
- public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
- //1 update processInstance host is null
- processInstance.setHost("null");
- processInstanceMapper.update(processInstance);
- //2 insert into recover command
- Command cmd = new Command();
- cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
- cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
- cmd.setExecutorId(processInstance.getExecutorId());
- cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
- createCommand(cmd);
- }
- /**
- * query all need failover task instances by host
- * @param host
- * @return
- */
- public List<TaskInstance> queryNeedFailoverTaskInstances(String host){
- return taskInstanceMapper.queryByHostAndStatus(host, stateArray);
- }
- /**
- * update host null
- * @param host
- * @return
- */
- public int updateNeedFailoverTaskInstances(String host){
- return taskInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
- }
- /**
- * find data source by id
- * @param id
- * @return
- */
- public DataSource findDataSourceById(int id){
- return dataSourceMapper.queryById(id);
- }
- /**
- * update process instance state by id
- * @param processInstanceId
- * @param executionStatus
- * @return
- */
- public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
- return processInstanceMapper.updateState(processInstanceId, executionStatus);
- }
- /**
- * find process instance by the task id
- * @param taskId
- * @return
- */
- public ProcessInstance findProcessInstanceByTaskId(int taskId){
- return processInstanceMapper.queryByTaskId(taskId);
- }
- /**
- * find udf function list by id list string
- * @param ids
- * @return
- */
- public List<UdfFunc> queryUdfFunListByids(String ids){
- return udfFuncMapper.queryUdfByIdStr(ids);
- }
- /**
- * find tenant code by resource name
- * @param resName
- * @return
- */
- public String queryTenantCodeByResName(String resName){
- return resourceMapper.queryTenantCodeByResourceName(resName);
- }
- /**
- * find schedule list by process define id.
- * @param ids
- * @return
- */
- public List<Schedule> selectAllByProcessDefineId(int[] ids){
- return scheduleMapper.selectAllByProcessDefineArray(ids);
- }
- /**
- * get dependency cycle by work process define id and scheduler fire time
- *
- * @param masterId
- * @param processDefinitionId
- * @param scheduledFireTime 任务调度预计触发的时间
- * @return
- * @throws Exception
- */
- public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
- List<CycleDependency> list = getCycleDependencies(masterId,new int[]{processDefinitionId},scheduledFireTime);
- return list.size()>0 ? list.get(0) : null;
- }
- /**
- *
- * get dependency cycle list by work process define id list and scheduler fire time
- * @param masterId
- * @param ids
- * @param scheduledFireTime 任务调度预计触发的时间
- * @return
- * @throws Exception
- */
- public List<CycleDependency> getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception {
- List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
- if(ArrayUtils.isEmpty(ids)){
- logger.warn("ids[] is empty!is invalid!");
- return cycleDependencyList;
- }
- if(scheduledFireTime == null){
- logger.warn("scheduledFireTime is null!is invalid!");
- return cycleDependencyList;
- }
- String strCrontab = "";
- CronExpression depCronExpression;
- Cron depCron;
- List<Date> list;
- List<Schedule> schedules = this.selectAllByProcessDefineId(ids);
- // 遍历所有的调度信息
- for(Schedule depSchedule:schedules){
- strCrontab = depSchedule.getCrontab();
- depCronExpression = CronUtils.parse2CronExpression(strCrontab);
- depCron = CronUtils.parse2Cron(strCrontab);
- CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron);
- if(cycleEnum == null){
- logger.error("{} is not valid",strCrontab);
- continue;
- }
- Calendar calendar = Calendar.getInstance();
- switch (cycleEnum){
- /*case MINUTE:
- calendar.add(Calendar.MINUTE,-61);*/
- case HOUR:
- calendar.add(Calendar.HOUR,-25);
- break;
- case DAY:
- calendar.add(Calendar.DATE,-32);
- break;
- case WEEK:
- calendar.add(Calendar.DATE,-32);
- break;
- case MONTH:
- calendar.add(Calendar.MONTH,-13);
- break;
- default:
- logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name());
- continue;
- }
- Date start = calendar.getTime();
- if(depSchedule.getProcessDefinitionId() == masterId){
- list = CronUtils.getSelfFireDateList(start, scheduledFireTime, depCronExpression);
- }else {
- list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
- }
- if(list.size()>=1){
- start = list.get(list.size()-1);
- CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(),start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
- cycleDependencyList.add(dependency);
- }
- }
- return cycleDependencyList;
- }
- /**
- * find process instance by time interval
- * @param defineId
- * @param startTime
- * @param endTime
- * @return
- */
- public ProcessInstance findProcessInstanceByTimeInterval(int defineId, Date startTime, Date endTime, int excludeId) {
- return processInstanceMapper.queryByScheduleTime(defineId, null, excludeId,
- DateUtils.dateToString(startTime), DateUtils.dateToString(endTime));
- }
- public void selfFaultTolerant(int state){
- List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(new int[]{state});
- for (ProcessInstance processInstance:processInstanceList){
- selfFaultTolerant(processInstance);
- }
- }
- /**
- * master starup fault tolerant
- */
- public void masterStartupFaultTolerant(){
- int[] readyStopAndKill=new int[]{ExecutionStatus.READY_PAUSE.ordinal(),ExecutionStatus.READY_STOP.ordinal(),
- ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),ExecutionStatus.RUNNING_EXEUTION.ordinal()};
- List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(readyStopAndKill);
- for (ProcessInstance processInstance:processInstanceList){
- processNeedFailoverProcessInstances(processInstance);
- }
- }
- @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
- public void selfFaultTolerant(ProcessInstance processInstance){
- processInstance.setState(ExecutionStatus.FAILURE);
- processInstanceMapper.update(processInstance);
- // insert to command
- Command command = new Command();
- command.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- command.setProcessDefinitionId(processInstance.getProcessDefinitionId());
- command.setCommandParam(String.format("{\"%s\":%d}",
- CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
- command.setExecutorId(processInstance.getExecutorId());
- command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- createCommand(command);
- }
- /**
- * find last scheduler process instance in the date interval
- * @param definitionId
- * @param dateInterval
- * @return
- */
- public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) {
- return processInstanceMapper.queryLastSchedulerProcess(definitionId,
- DateUtils.dateToString(dateInterval.getStartTime()),
- DateUtils.dateToString(dateInterval.getEndTime()));
- }
- public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) {
- return processInstanceMapper.queryLastManualProcess(definitionId,
- DateUtils.dateToString(dateInterval.getStartTime()),
- DateUtils.dateToString(dateInterval.getEndTime()));
- }
- public ProcessInstance findLastRunningProcess(int definitionId, DateInterval dateInterval) {
- return processInstanceMapper.queryLastRunningProcess(definitionId,
- DateUtils.dateToString(dateInterval.getStartTime()),
- DateUtils.dateToString(dateInterval.getEndTime()),
- stateArray);
- }
- /**
- * query user queue by process instance id
- * @param processInstanceId
- * @return
- */
- public String queryQueueByProcessInstanceId(int processInstanceId){
- return userMapper.queryQueueByProcessInstanceId(processInstanceId);
- }
- /**
- * query worker group by id
- * @param workerGroupId
- * @return
- */
- public WorkerGroup queryWorkerGroupById(int workerGroupId){
- return workerGroupMapper.queryById(workerGroupId);
- }
- /**
- * query worker server by host
- * @param host
- * @return
- */
- public List<WorkerServer> queryWorkerServerByHost(String host){
- return workerServerMapper.queryWorkerByHost(host);
- }
- /**
- * get task worker group id
- *
- * @param taskInstance
- * @return
- */
- public int getTaskWorkerGroupId(TaskInstance taskInstance) {
- int taskWorkerGroupId = taskInstance.getWorkerGroupId();
- ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
- if(processInstance == null){
- logger.error("cannot find the task:{} process instance", taskInstance.getId());
- return Constants.DEFAULT_WORKER_ID;
- }
- int processWorkerGroupId = processInstance.getWorkerGroupId();
- taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
- return taskWorkerGroupId;
- }
- }
|