ProcessDao.java 58 KB


  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package cn.escheduler.dao;
  18. import cn.escheduler.common.Constants;
  19. import cn.escheduler.common.enums.*;
  20. import cn.escheduler.common.model.DateInterval;
  21. import cn.escheduler.common.model.TaskNode;
  22. import cn.escheduler.common.queue.ITaskQueue;
  23. import cn.escheduler.common.queue.TaskQueueFactory;
  24. import cn.escheduler.common.task.subprocess.SubProcessParameters;
  25. import cn.escheduler.common.utils.DateUtils;
  26. import cn.escheduler.common.utils.JSONUtils;
  27. import cn.escheduler.common.utils.ParameterUtils;
  28. import cn.escheduler.dao.mapper.*;
  29. import cn.escheduler.dao.model.*;
  30. import cn.escheduler.dao.utils.cron.CronUtils;
  31. import com.alibaba.fastjson.JSONObject;
  32. import com.cronutils.model.Cron;
  33. import org.apache.commons.lang3.ArrayUtils;
  34. import org.apache.commons.lang3.StringUtils;
  35. import org.quartz.CronExpression;
  36. import org.slf4j.Logger;
  37. import org.slf4j.LoggerFactory;
  38. import org.springframework.beans.factory.annotation.Autowired;
  39. import org.springframework.stereotype.Component;
  40. import org.springframework.transaction.annotation.Transactional;
  41. import java.util.*;
  42. import static cn.escheduler.common.Constants.*;
  43. import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper;
  44. /**
  45. * process relative dao that some mappers in this.
  46. */
  47. @Component
  48. public class ProcessDao extends AbstractBaseDao {
  49. private final Logger logger = LoggerFactory.getLogger(getClass());
  50. private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
  51. ExecutionStatus.RUNNING_EXEUTION.ordinal(),
  52. ExecutionStatus.READY_PAUSE.ordinal(),
  53. ExecutionStatus.READY_STOP.ordinal()};
  54. @Autowired
  55. private ProjectMapper projectMapper;
  56. @Autowired
  57. private ProcessDefinitionMapper processDefineMapper;
  58. @Autowired
  59. private ProcessInstanceMapper processInstanceMapper;
  60. @Autowired
  61. private DataSourceMapper dataSourceMapper;
  62. @Autowired
  63. private ProcessInstanceMapMapper processInstanceMapMapper;
  64. @Autowired
  65. private TaskInstanceMapper taskInstanceMapper;
  66. @Autowired
  67. private CommandMapper commandMapper;
  68. @Autowired
  69. private ScheduleMapper scheduleMapper;
  70. @Autowired
  71. private UdfFuncMapper udfFuncMapper;
  72. @Autowired
  73. private ResourceMapper resourceMapper;
  74. /**
  75. * task queue impl
  76. */
  77. protected ITaskQueue taskQueue;
  78. public ProcessDao(){
  79. init();
  80. }
  81. /**
  82. * initialize
  83. */
  84. @Override
  85. protected void init() {
  86. projectMapper = getMapper(ProjectMapper.class);
  87. processDefineMapper = getMapper(ProcessDefinitionMapper.class);
  88. processInstanceMapper = getMapper(ProcessInstanceMapper.class);
  89. dataSourceMapper = getMapper(DataSourceMapper.class);
  90. processInstanceMapMapper = getMapper(ProcessInstanceMapMapper.class);
  91. taskInstanceMapper = getMapper(TaskInstanceMapper.class);
  92. commandMapper = getMapper(CommandMapper.class);
  93. scheduleMapper = getMapper(ScheduleMapper.class);
  94. udfFuncMapper = getMapper(UdfFuncMapper.class);
  95. resourceMapper = getMapper(ResourceMapper.class);
  96. taskQueue = TaskQueueFactory.getTaskQueueInstance();
  97. }
  98. /**
  99. * find one command from command queue, construct process instance
  100. * @param logger
  101. * @param host
  102. * @param vaildThreadNum
  103. * @return
  104. */
  105. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  106. public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){
  107. ProcessInstance processInstance = null;
  108. Command command = findOneCommand();
  109. if (command == null) {
  110. return null;
  111. }
  112. logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
  113. processInstance = constructProcessInstance(command, host);
  114. //cannot construct process instance, return null;
  115. if(processInstance == null){
  116. logger.error("scan command, command parameter is error: %s", command.toString());
  117. }else{
  118. // check thread number enough for this command, if not, change state to waiting thread.
  119. int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
  120. if(vaildThreadNum < commandThreadCount){
  121. logger.info("there is not enough thread for this command: {}",command.toString() );
  122. processInstance.setState(ExecutionStatus.WAITTING_THREAD);
  123. if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){
  124. processInstance.addHistoryCmd(command.getCommandType());
  125. }
  126. saveProcessInstance(processInstance);
  127. this.setSubProcessParam(processInstance);
  128. createRecoveryWaitingThreadCommand(command, processInstance);
  129. return null;
  130. }else{
  131. processInstance.setCommandType(command.getCommandType());
  132. processInstance.addHistoryCmd(command.getCommandType());
  133. saveProcessInstance(processInstance);
  134. this.setSubProcessParam(processInstance);
  135. }
  136. }
  137. // delete command
  138. delCommandByid(command.getId());
  139. return processInstance;
  140. }
  141. /**
  142. * insert one command
  143. */
  144. public int createCommand(Command command) {
  145. int result = 0;
  146. if (command != null){
  147. result = commandMapper.insert(command);
  148. }
  149. return result;
  150. }
  151. /**
  152. *
  153. * find one command from queue list
  154. * @return
  155. */
  156. public Command findOneCommand(){
  157. return commandMapper.queryOneCommand();
  158. }
  159. /**
  160. * check the input command exists in queue list
  161. * @param command
  162. * @return
  163. */
  164. public Boolean verifyIsNeedCreateCommand(Command command){
  165. Boolean isNeedCreate = true;
  166. Map<CommandType,Integer> cmdTypeMap = new HashMap<CommandType,Integer>();
  167. cmdTypeMap.put(CommandType.REPEAT_RUNNING,1);
  168. cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS,1);
  169. cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS,1);
  170. CommandType commandType = command.getCommandType();
  171. if(cmdTypeMap.containsKey(commandType)){
  172. JSONObject cmdParamObj = (JSONObject) JSONObject.parse(command.getCommandParam());
  173. JSONObject tempObj;
  174. int processInstanceId = cmdParamObj.getInteger(CMDPARAM_RECOVER_PROCESS_ID_STRING);
  175. List<Command> commands = commandMapper.queryAllCommand();
  176. //遍历所有命令
  177. for (Command tmpCommand:commands){
  178. if(cmdTypeMap.containsKey(tmpCommand.getCommandType())){
  179. tempObj = (JSONObject) JSONObject.parse(tmpCommand.getCommandParam());
  180. if(tempObj != null && processInstanceId == tempObj.getInteger(CMDPARAM_RECOVER_PROCESS_ID_STRING)){
  181. isNeedCreate = false;
  182. break;
  183. }
  184. }
  185. }
  186. }
  187. return isNeedCreate;
  188. }
  189. /**
  190. * find process instance detail by id
  191. * @param processId
  192. * @return
  193. */
  194. public ProcessInstance findProcessInstanceDetailById(int processId){
  195. return processInstanceMapper.queryDetailById(processId);
  196. }
  197. /**
  198. * find process instance by id
  199. * @param processId
  200. * @return
  201. */
  202. public ProcessInstance findProcessInstanceById(int processId){
  203. return processInstanceMapper.queryById(processId);
  204. }
  205. /**
  206. * find process instance by scheduler time.
  207. * @param defineId
  208. * @param scheduleTime
  209. * @return
  210. */
  211. public ProcessInstance findProcessInstanceByScheduleTime(int defineId, Date scheduleTime){
  212. return processInstanceMapper.queryByScheduleTime(defineId,
  213. DateUtils.dateToString(scheduleTime), 0,null, null);
  214. }
  215. /**
  216. * find process define by id.
  217. * @param processDefinitionId
  218. * @return
  219. */
  220. public ProcessDefinition findProcessDefineById(int processDefinitionId) {
  221. return processDefineMapper.queryByDefineId(processDefinitionId);
  222. }
  223. /**
  224. * delete work process instance by id
  225. * @param processInstanceId
  226. * @return
  227. */
  228. public int deleteWorkProcessInstanceById(int processInstanceId){
  229. return processInstanceMapper.delete(processInstanceId);
  230. }
  231. /**
  232. *
  233. * delete all sub process by parent instance id
  234. * @return
  235. */
  236. public int deleteAllSubWorkProcessByParentId(int processInstanceId){
  237. List<Integer> subProcessIdList = processInstanceMapper.querySubIdListByParentId(processInstanceId);
  238. for(Integer subId : subProcessIdList ){
  239. deleteAllSubWorkProcessByParentId(subId);
  240. deleteWorkProcessMapByParentId(subId);
  241. deleteWorkProcessInstanceById(subId);
  242. }
  243. return 1;
  244. }
  245. /**
  246. * create process define
  247. * @param processDefinition
  248. * @return
  249. */
  250. public int createProcessDefine(ProcessDefinition processDefinition){
  251. int count = 0;
  252. if(processDefinition != null){
  253. count = this.processDefineMapper.insert(processDefinition);
  254. }
  255. return count;
  256. }
  257. /**
  258. * calculate sub process number in the process define.
  259. * @param processDefinitionId
  260. * @return
  261. */
  262. private Integer workProcessThreadNumCount(Integer processDefinitionId){
  263. List<String> ids = new ArrayList<>();
  264. recurseFindSubProcessId(processDefinitionId, ids);
  265. return ids.size()+1;
  266. }
  267. /**
  268. * recursive query sub process definition id by parent id.
  269. * @param parentId
  270. * @param ids
  271. */
  272. public void recurseFindSubProcessId(int parentId, List<String> ids){
  273. ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(parentId);
  274. String processDefinitionJson = processDefinition.getProcessDefinitionJson();
  275. ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
  276. List<TaskNode> taskNodeList = processData.getTasks();
  277. if (taskNodeList != null && taskNodeList.size() > 0){
  278. for (TaskNode taskNode : taskNodeList){
  279. String parameter = taskNode.getParams();
  280. if (parameter.contains(CMDPARAM_SUB_PROCESS_DEFINE_ID)){
  281. SubProcessParameters subProcessParam = JSONObject.parseObject(parameter, SubProcessParameters.class);
  282. ids.add(String.valueOf(subProcessParam.getProcessDefinitionId()));
  283. recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(),ids);
  284. }
  285. }
  286. }
  287. }
  288. /**
  289. * create recovery waiting thread command when thread pool is not enough for the process instance.
  290. * sub work process instance need not to create recovery command.
  291. * create recovery waiting thread command and delete origin command at the same time.
  292. * if the recovery command is exists, only update the field update_time
  293. * @param originCommand
  294. * @param processInstance
  295. */
  296. public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
  297. // sub process doesnot need to create wait command
  298. if(processInstance.getIsSubProcess() == Flag.YES){
  299. if(originCommand != null){
  300. commandMapper.delete(originCommand.getId());
  301. }
  302. return;
  303. }
  304. Map<String, String> cmdParam = new HashMap<>();
  305. cmdParam.put(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD, String.valueOf(processInstance.getId()));
  306. // process instance quit by "waiting thread" state
  307. if(originCommand == null){
  308. Command command = new Command(
  309. CommandType.RECOVER_WAITTING_THREAD,
  310. processInstance.getTaskDependType(),
  311. processInstance.getFailureStrategy(),
  312. processInstance.getExecutorId(),
  313. processInstance.getProcessDefinitionId(),
  314. JSONUtils.toJson(cmdParam),
  315. processInstance.getWarningType(),
  316. processInstance.getWarningGroupId(),
  317. processInstance.getScheduleTime(),
  318. processInstance.getProcessInstancePriority()
  319. );
  320. saveCommand(command);
  321. return ;
  322. }
  323. // update the command time if current command if recover from waiting
  324. if(originCommand.getCommandType() == CommandType.RECOVER_WAITTING_THREAD){
  325. originCommand.setUpdateTime(new Date());
  326. saveCommand(originCommand);
  327. }else{
  328. // delete old command and create new waiting thread command
  329. commandMapper.delete(originCommand.getId());
  330. originCommand.setId(0);
  331. originCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
  332. originCommand.setUpdateTime(new Date());
  333. originCommand.setCommandParam(JSONUtils.toJson(cmdParam));
  334. originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  335. saveCommand(originCommand);
  336. }
  337. }
  338. /**
  339. * get schedule time from command
  340. * @param command
  341. * @param cmdParam
  342. * @return
  343. */
  344. private Date getScheduleTime(Command command, Map<String, String> cmdParam){
  345. Date scheduleTime = command.getScheduleTime();
  346. if(scheduleTime == null){
  347. if(cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
  348. scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
  349. }
  350. }
  351. return scheduleTime;
  352. }
  353. /**
  354. * generate a new work process instance from command.
  355. * @param processDefinition
  356. * @param command
  357. * @param cmdParam
  358. * @return
  359. */
  360. private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
  361. Command command,
  362. Map<String, String> cmdParam){
  363. ProcessInstance processInstance = new ProcessInstance(processDefinition);
  364. processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
  365. processInstance.setRecovery(Flag.NO);
  366. processInstance.setStartTime(new Date());
  367. processInstance.setRunTimes(1);
  368. processInstance.setMaxTryTimes(0);
  369. processInstance.setProcessDefinitionId(command.getProcessDefinitionId());
  370. processInstance.setCommandParam(command.getCommandParam());
  371. processInstance.setCommandType(command.getCommandType());
  372. processInstance.setIsSubProcess(Flag.NO);
  373. processInstance.setTaskDependType(command.getTaskDependType());
  374. processInstance.setFailureStrategy(command.getFailureStrategy());
  375. processInstance.setExecutorId(command.getExecutorId());
  376. WarningType warningType = command.getWarningType() == null ? WarningType.NONE : command.getWarningType();
  377. processInstance.setWarningType(warningType);
  378. Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
  379. processInstance.setWarningGroupId(warningGroupId);
  380. // schedule time
  381. Date scheduleTime = getScheduleTime(command, cmdParam);
  382. if(scheduleTime != null){
  383. processInstance.setScheduleTime(scheduleTime);
  384. }
  385. processInstance.setCommandStartTime(command.getStartTime());
  386. processInstance.setLocations(processDefinition.getLocations());
  387. processInstance.setConnects(processDefinition.getConnects());
  388. // curing global params
  389. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
  390. processDefinition.getGlobalParamMap(),
  391. processDefinition.getGlobalParamList(),
  392. getCommandTypeIfComplement(processInstance, command),
  393. processInstance.getScheduleTime()));
  394. //copy process define json to process instance
  395. processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
  396. // set process instance priority
  397. processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
  398. return processInstance;
  399. }
  400. /**
  401. * check command parameters is valid
  402. * @param command
  403. * @param cmdParam
  404. * @return
  405. */
  406. private Boolean checkCmdParam(Command command, Map<String, String> cmdParam){
  407. if(command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType()== TaskDependType.TASK_PRE){
  408. if(cmdParam == null
  409. || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES)
  410. || cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()){
  411. logger.error(String.format("command node depend type is %s, but start nodes is null ", command.getTaskDependType().toString()));
  412. return false;
  413. }
  414. }
  415. return true;
  416. }
  417. /**
  418. * construct process instance according to one command.
  419. * @param command
  420. * @param host
  421. * @return
  422. */
  423. private ProcessInstance constructProcessInstance(Command command, String host){
  424. ProcessInstance processInstance = null;
  425. CommandType commandType = command.getCommandType();
  426. Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
  427. ProcessDefinition processDefinition = null;
  428. if(command.getProcessDefinitionId() != 0){
  429. processDefinition = processDefineMapper.queryByDefineId(command.getProcessDefinitionId());
  430. if(processDefinition == null){
  431. logger.error(String.format("cannot find the work process define! define id : %d", command.getProcessDefinitionId()));
  432. return null;
  433. }
  434. }
  435. if(cmdParam != null ){
  436. Integer processInstanceId = 0;
  437. // recover from failure or pause tasks
  438. if(cmdParam.containsKey(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING)) {
  439. String processId = cmdParam.get(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING);
  440. processInstanceId = Integer.parseInt(processId);
  441. if (processInstanceId == 0) {
  442. logger.error("command parameter is error, [ ProcessInstanceId ] is 0");
  443. return null;
  444. }
  445. }else if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
  446. // sub process map
  447. String pId = cmdParam.get(Constants.CMDPARAM_SUB_PROCESS);
  448. processInstanceId = Integer.parseInt(pId);
  449. }else if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD)){
  450. // waiting thread command
  451. String pId = cmdParam.get(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD);
  452. processInstanceId = Integer.parseInt(pId);
  453. }
  454. if(processInstanceId ==0){
  455. processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
  456. }else{
  457. processInstance = this.findProcessInstanceDetailById(processInstanceId);
  458. }
  459. processDefinition = processDefineMapper.queryByDefineId(processInstance.getProcessDefinitionId());
  460. processInstance.setProcessDefinition(processDefinition);
  461. //reset command parameter
  462. if(processInstance.getCommandParam() != null){
  463. Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
  464. for(String key : processCmdParam.keySet()){
  465. if(!cmdParam.containsKey(key)){
  466. cmdParam.put(key,processCmdParam.get(key));
  467. }
  468. }
  469. }
  470. // reset command parameter if sub process
  471. if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
  472. processInstance.setCommandParam(command.getCommandParam());
  473. }
  474. }else{
  475. // generate one new process instance
  476. processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
  477. }
  478. if(!checkCmdParam(command, cmdParam)){
  479. logger.error("command parameter check failed!");
  480. return null;
  481. }
  482. if(command.getScheduleTime() != null){
  483. processInstance.setScheduleTime(command.getScheduleTime());
  484. }
  485. processInstance.setHost(host);
  486. int runTime = processInstance.getRunTimes();
  487. switch (commandType){
  488. case START_PROCESS:
  489. break;
  490. case START_FAILURE_TASK_PROCESS:
  491. // find failed tasks and init these tasks
  492. List<Integer> failedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
  493. List<Integer> killedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
  494. cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
  495. failedList.addAll(killedList);
  496. for(Integer taskId : failedList){
  497. initTaskInstance(this.findTaskInstanceById(taskId));
  498. }
  499. cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
  500. String.join(Constants.COMMA, convertIntListToString(failedList)));
  501. processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
  502. processInstance.setRunTimes(runTime +1 );
  503. break;
  504. case START_CURRENT_TASK_PROCESS:
  505. break;
  506. case RECOVER_WAITTING_THREAD:
  507. break;
  508. case RECOVER_SUSPENDED_PROCESS:
  509. // find pause tasks and init task's state
  510. cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
  511. List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
  512. for(Integer taskId : suspendedNodeList){
  513. // 把暂停状态初始化
  514. initTaskInstance(this.findTaskInstanceById(taskId));
  515. }
  516. cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(suspendedNodeList)));
  517. processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
  518. processInstance.setRunTimes(runTime +1);
  519. break;
  520. case RECOVER_TOLERANCE_FAULT_PROCESS:
  521. // recover tolerance fault process
  522. processInstance.setRecovery(Flag.YES);
  523. break;
  524. case COMPLEMENT_DATA:
  525. // delete all the valid tasks when complement data
  526. List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
  527. for(TaskInstance taskInstance : taskInstanceList){
  528. taskInstance.setFlag(Flag.NO);
  529. this.updateTaskInstance(taskInstance);
  530. }
  531. break;
  532. case REPEAT_RUNNING:
  533. // delete the recover task names from command parameter
  534. if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
  535. cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
  536. processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
  537. }
  538. // delete all the valid tasks when repeat running
  539. List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
  540. for(TaskInstance taskInstance : validTaskList){
  541. taskInstance.setFlag(Flag.NO);
  542. updateTaskInstance(taskInstance);
  543. }
  544. processInstance.setStartTime(new Date());
  545. processInstance.setEndTime(null);
  546. processInstance.setRunTimes(runTime +1);
  547. initComplementDataParam(processDefinition, processInstance, cmdParam);
  548. break;
  549. case SCHEDULER:
  550. break;
  551. default:
  552. break;
  553. }
  554. processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
  555. return processInstance;
  556. }
  557. /**
  558. * return complement data if the process start with complement data
  559. */
  560. private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command){
  561. if(CommandType.COMPLEMENT_DATA == processInstance.getCmdTypeIfComplement()){
  562. return CommandType.COMPLEMENT_DATA;
  563. }else{
  564. return command.getCommandType();
  565. }
  566. }
  567. /**
  568. * initialize complement data parameters
  569. * @param processDefinition
  570. * @param processInstance
  571. * @param cmdParam
  572. */
  573. private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, Map<String, String> cmdParam) {
  574. if(!processInstance.isComplementData()){
  575. return;
  576. }
  577. Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
  578. YYYY_MM_DD_HH_MM_SS);
  579. processInstance.setScheduleTime(startComplementTime);
  580. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
  581. processDefinition.getGlobalParamMap(),
  582. processDefinition.getGlobalParamList(),
  583. CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
  584. }
  585. /**
  586. * set sub work process parameters.
  587. * handle sub work process instance, update relation table and command parameters
  588. * set sub work process flag, extends parent work process command parameters.
  589. */
  590. public ProcessInstance setSubProcessParam(ProcessInstance processInstance){
  591. String cmdParam = processInstance.getCommandParam();
  592. if(StringUtils.isEmpty(cmdParam)){
  593. return processInstance;
  594. }
  595. Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
  596. // write sub process id into cmd param.
  597. if(paramMap.containsKey(CMDPARAM_SUB_PROCESS)
  598. && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){
  599. paramMap.remove(CMDPARAM_SUB_PROCESS);
  600. paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId()));
  601. processInstance.setCommandParam(JSONUtils.toJson(paramMap));
  602. processInstance.setIsSubProcess(Flag.YES);
  603. this.updateProcessInstance(processInstance);
  604. }
  605. // copy parent instance user def params to sub process..
  606. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
  607. if(StringUtils.isNotEmpty(parentInstanceId)){
  608. ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
  609. if(parentInstance != null){
  610. processInstance.setGlobalParams(parentInstance.getGlobalParams());
  611. this.updateProcessInstance(processInstance);
  612. }else{
  613. logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
  614. }
  615. }
  616. ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
  617. if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){
  618. return processInstance;
  619. }
  620. // update sub process id to process map table
  621. processInstanceMap.setProcessInstanceId(processInstance.getId());
  622. this.updateWorkProcessInstanceMap(processInstanceMap);
  623. return processInstance;
  624. }
  625. /**
  626. * initialize task instance
  627. * @param taskInstance
  628. */
  629. private void initTaskInstance(TaskInstance taskInstance){
  630. if(taskInstance.getState().typeIsFailure() && !taskInstance.isSubProcess()){
  631. taskInstance.setFlag(Flag.NO);
  632. updateTaskInstance(taskInstance);
  633. }else{
  634. taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
  635. updateTaskInstance(taskInstance);
  636. }
  637. }
  638. /**
  639. * submit task to mysql and task queue
  640. * submit sub process to command
  641. * @param taskInstance
  642. * @return
  643. */
  644. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  645. public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
  646. logger.info("start submit task : {}, instance id:{}, state: {}, ",
  647. taskInstance.getName(), processInstance.getId(), processInstance.getState() );
  648. processInstance = this.findProcessInstanceDetailById(processInstance.getId());
  649. //submit to mysql
  650. TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
  651. if(task.isSubProcess() && !task.getState().typeIsFinished()){
  652. ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
  653. TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
  654. Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
  655. Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
  656. createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
  657. }else if(!task.getState().typeIsFinished()){
  658. //submit to task queue
  659. task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  660. submitTaskToQueue(task);
  661. }
  662. logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",
  663. taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
  664. return task;
  665. }
  666. /**
  667. * set work process instance map
  668. * @param parentInstance
  669. * @param parentTask
  670. * @return
  671. */
  672. private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){
  673. ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
  674. if(processMap != null){
  675. return processMap;
  676. }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING
  677. || parentInstance.isComplementData()){
  678. // update current task id to map
  679. // repeat running does not generate new sub process instance
  680. processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
  681. if(processMap!= null){
  682. processMap.setParentTaskInstanceId(parentTask.getId());
  683. updateWorkProcessInstanceMap(processMap);
  684. return processMap;
  685. }
  686. }
  687. // new task
  688. processMap = new ProcessInstanceMap();
  689. processMap.setParentProcessInstanceId(parentInstance.getId());
  690. processMap.setParentTaskInstanceId(parentTask.getId());
  691. createWorkProcessInstanceMap(processMap);
  692. return processMap;
  693. }
  694. /**
  695. * find previous task work process map.
  696. * @param parentProcessInstance
  697. * @param parentTask
  698. * @return
  699. */
  700. private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
  701. TaskInstance parentTask) {
  702. Integer preTaskId = 0;
  703. List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
  704. for(TaskInstance task : preTaskList){
  705. if(task.getName().equals(parentTask.getName())){
  706. preTaskId = task.getId();
  707. ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
  708. if(map!=null){
  709. return map;
  710. }
  711. }
  712. }
  713. logger.info("sub process instance is not found,parent task:{},parent instance:{}",
  714. parentTask.getId(), parentProcessInstance.getId());
  715. return null;
  716. }
  717. /**
  718. * create sub work process command
  719. * @param parentProcessInstance
  720. * @param instanceMap
  721. * @param childDefineId
  722. * @param task
  723. */
  724. private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
  725. ProcessInstanceMap instanceMap,
  726. Integer childDefineId, TaskInstance task){
  727. ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
  728. CommandType fatherType = parentProcessInstance.getCommandType();
  729. CommandType commandType = fatherType;
  730. if(childInstance == null || commandType == CommandType.REPEAT_RUNNING){
  731. String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
  732. // sub process must begin with schedule/complement data
  733. // if father begin with scheduler/complement data
  734. if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
  735. fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
  736. commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
  737. }
  738. }
  739. if(childInstance != null){
  740. childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
  741. updateProcessInstance(childInstance);
  742. }
  743. // set sub work process command
  744. String processMapStr = JSONUtils.toJson(instanceMap);
  745. Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
  746. if(commandType == CommandType.COMPLEMENT_DATA ||
  747. (childInstance != null && childInstance.isComplementData())){
  748. Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
  749. String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
  750. String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
  751. cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
  752. cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
  753. processMapStr = JSONUtils.toJson(cmdParam);
  754. }
  755. Command command = new Command();
  756. command.setWarningType(parentProcessInstance.getWarningType());
  757. command.setWarningGroupId(parentProcessInstance.getWarningGroupId());
  758. command.setFailureStrategy(parentProcessInstance.getFailureStrategy());
  759. command.setProcessDefinitionId(childDefineId);
  760. command.setScheduleTime(parentProcessInstance.getScheduleTime());
  761. command.setExecutorId(parentProcessInstance.getExecutorId());
  762. command.setCommandParam(processMapStr);
  763. command.setCommandType(commandType);
  764. command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
  765. createCommand(command);
  766. logger.info("sub process command created: {} ", command.toString());
  767. }
  768. /**
  769. * submit task to mysql
  770. * @param taskInstance
  771. * @return
  772. */
  773. public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){
  774. ExecutionStatus processInstanceState = processInstance.getState();
  775. if(taskInstance.getState().typeIsFailure()){
  776. if(taskInstance.isSubProcess()){
  777. taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
  778. }else {
  779. if( processInstanceState != ExecutionStatus.READY_STOP
  780. && processInstanceState != ExecutionStatus.READY_PAUSE){
  781. // failure task set invalid
  782. taskInstance.setFlag(Flag.NO);
  783. updateTaskInstance(taskInstance);
  784. // crate new task instance
  785. taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
  786. taskInstance.setFlag(Flag.YES);
  787. taskInstance.setHost(null);
  788. taskInstance.setId(0);
  789. }
  790. }
  791. }
  792. taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  793. taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
  794. taskInstance.setSubmitTime(new Date());
  795. saveTaskInstance(taskInstance);
  796. return taskInstance;
  797. }
  798. /**
  799. * submit task to queue
  800. * @param task
  801. */
  802. public Boolean submitTaskToQueue(TaskInstance task) {
  803. try{
  804. // task cannot submit when running
  805. if(task.getState() == ExecutionStatus.RUNNING_EXEUTION){
  806. logger.info(String.format("submit to task queue, but task [%s] state already be running. ", task.getName()));
  807. return true;
  808. }
  809. if(checkTaskExistsInTaskQueue(task)){
  810. logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", task.getName()));
  811. return true;
  812. }
  813. logger.info("task ready to queue: {}" , task);
  814. taskQueue.add(SCHEDULER_TASKS_QUEUE, taskZkInfo(task));
  815. logger.info(String.format("master insert into queue success, task : %s", task.getName()) );
  816. return true;
  817. }catch (Exception e){
  818. logger.error("submit task to queue Exception: ", e);
  819. logger.error("task queue error : %s", JSONUtils.toJson(task));
  820. return false;
  821. }
  822. }
  823. /**
  824. * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
  825. *
  826. * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
  827. *
  828. * 流程实例优先级_流程实例id_任务优先级_任务id high <- low
  829. *
  830. * @param task
  831. * @return
  832. */
  833. private String taskZkInfo(TaskInstance task) {
  834. return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId();
  835. }
  836. /**
  837. * get submit task instance state by the work process state
  838. * cannot modify the task state when running/kill/submit success, or this
  839. * task instance is already exists in task queue .
  840. * return pause if work process state is ready pause
  841. * return stop if work process state is ready stop
  842. * if all of above are not satisfied, return submit success
  843. *
  844. * @param taskInstance
  845. * @param processInstanceState
  846. * @return
  847. */
  848. public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){
  849. ExecutionStatus state = taskInstance.getState();
  850. if(
  851. // running or killed
  852. // the task already exists in task queue
  853. // return state
  854. state == ExecutionStatus.RUNNING_EXEUTION
  855. || state == ExecutionStatus.KILL
  856. || checkTaskExistsInTaskQueue(taskInstance)
  857. ){
  858. return state;
  859. }
  860. //return pasue /stop if process instance state is ready pause / stop
  861. // or return submit success
  862. if( processInstanceState == ExecutionStatus.READY_PAUSE){
  863. state = ExecutionStatus.PAUSE;
  864. }else if(processInstanceState == ExecutionStatus.READY_STOP) {
  865. state = ExecutionStatus.KILL;
  866. }else{
  867. state = ExecutionStatus.SUBMITTED_SUCCESS;
  868. }
  869. return state;
  870. }
  871. /**
  872. * check the task instance existing in queue
  873. * @return
  874. */
  875. public boolean checkTaskExistsInTaskQueue(TaskInstance task){
  876. if(task.isSubProcess()){
  877. return false;
  878. }
  879. String taskZkInfo = taskZkInfo(task);
  880. return taskQueue.checkTaskExists(SCHEDULER_TASKS_QUEUE, taskZkInfo);
  881. }
  882. /**
  883. * create a new process instance
  884. * @param processInstance
  885. */
  886. public void createProcessInstance(ProcessInstance processInstance){
  887. if (processInstance != null){
  888. processInstanceMapper.insert(processInstance);
  889. }
  890. }
  891. /**
  892. * insert or update work process instance to data base
  893. * @param workProcessInstance
  894. */
  895. public void saveProcessInstance(ProcessInstance workProcessInstance){
  896. if (workProcessInstance == null){
  897. logger.error("save error, process instance is null!");
  898. return ;
  899. }
  900. //创建流程实例
  901. if(workProcessInstance.getId() != 0){
  902. processInstanceMapper.update(workProcessInstance);
  903. }else{
  904. createProcessInstance(workProcessInstance);
  905. }
  906. }
  907. /**
  908. * insert or update command
  909. * @param command
  910. * @return
  911. */
  912. public int saveCommand(Command command){
  913. if(command.getId() != 0){
  914. return commandMapper.update(command);
  915. }else{
  916. return commandMapper.insert(command);
  917. }
  918. }
  919. /**
  920. * insert or update task instance
  921. * @param taskInstance
  922. * @return
  923. */
  924. public boolean saveTaskInstance(TaskInstance taskInstance){
  925. if(taskInstance.getId() != 0){
  926. return updateTaskInstance(taskInstance);
  927. }else{
  928. return createTaskInstance(taskInstance);
  929. }
  930. }
  931. /**
  932. * insert task instance
  933. * @param taskInstance
  934. * @return
  935. */
  936. public boolean createTaskInstance(TaskInstance taskInstance) {
  937. int count = taskInstanceMapper.insert(taskInstance);
  938. return count > 0;
  939. }
  940. /**
  941. * update task instance
  942. * @param taskInstance
  943. * @return
  944. */
  945. public boolean updateTaskInstance(TaskInstance taskInstance){
  946. int count = taskInstanceMapper.update(taskInstance);
  947. return count > 0;
  948. }
  949. /**
  950. * delete a command by id
  951. * @param id
  952. */
  953. public void delCommandByid(int id) {
  954. commandMapper.delete(id);
  955. }
  956. public TaskInstance findTaskInstanceById(Integer taskId){
  957. return taskInstanceMapper.queryById(taskId);
  958. }
  959. /**
  960. * get id list by task state
  961. * @param instanceId
  962. * @param state
  963. * @return
  964. */
  965. public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state){
  966. return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
  967. }
  968. /**
  969. *
  970. * find valid task list by process definition id
  971. * @param processInstanceId
  972. * @return
  973. */
  974. public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId){
  975. return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
  976. }
  977. /**
  978. * find previous task list by work process id
  979. * @param workProcessInstanceId
  980. * @return
  981. */
  982. public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer workProcessInstanceId){
  983. return taskInstanceMapper.findValidTaskListByProcessId(workProcessInstanceId, Flag.NO);
  984. }
  985. /**
  986. * update work process instance map
  987. * @param processInstanceMap
  988. * @return
  989. */
  990. public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
  991. return processInstanceMapMapper.update(processInstanceMap);
  992. }
  993. /**
  994. * create work process instance map
  995. * @param processInstanceMap
  996. * @return
  997. */
  998. public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
  999. Integer count = 0;
  1000. if(processInstanceMap !=null){
  1001. return processInstanceMapMapper.insert(processInstanceMap);
  1002. }
  1003. return count;
  1004. }
  1005. /**
  1006. * find work process map by parent process id and parent task id.
  1007. * @param parentWorkProcessId
  1008. * @param parentTaskId
  1009. * @return
  1010. */
  1011. public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId){
  1012. return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
  1013. }
  1014. /**
  1015. * delete work process map by parent process id
  1016. * @param parentWorkProcessId
  1017. * @return
  1018. */
  1019. public int deleteWorkProcessMapByParentId(int parentWorkProcessId){
  1020. return processInstanceMapMapper.deleteByParentProcessId(parentWorkProcessId);
  1021. }
  1022. public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId){
  1023. ProcessInstance processInstance = null;
  1024. ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryByParentId(parentProcessId, parentTaskId);
  1025. if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
  1026. return processInstance;
  1027. }
  1028. processInstance = findProcessInstanceById(processInstanceMap.getProcessInstanceId());
  1029. return processInstance;
  1030. }
  1031. public ProcessInstance findParentProcessInstance(Integer subProcessId) {
  1032. ProcessInstance processInstance = null;
  1033. ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(subProcessId);
  1034. if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
  1035. return processInstance;
  1036. }
  1037. processInstance = findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
  1038. return processInstance;
  1039. }
  1040. /**
  1041. * change task state
  1042. * @param state
  1043. * @param startTime
  1044. * @param host
  1045. * @param executePath
  1046. */
  1047. public void changeTaskState(ExecutionStatus state, Date startTime, String host,
  1048. String executePath,
  1049. String logPath,
  1050. int taskInstId) {
  1051. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1052. taskInstance.setState(state);
  1053. taskInstance.setStartTime(startTime);
  1054. taskInstance.setHost(host);
  1055. taskInstance.setExecutePath(executePath);
  1056. taskInstance.setLogPath(logPath);
  1057. saveTaskInstance(taskInstance);
  1058. }
  1059. /**
  1060. * update process instance
  1061. * @param instance
  1062. * @return
  1063. */
  1064. public int updateProcessInstance(ProcessInstance instance){
  1065. return processInstanceMapper.update(instance);
  1066. }
  1067. /**
  1068. * update the process instance
  1069. * @param processInstanceId
  1070. * @param processJson
  1071. * @param globalParams
  1072. * @param scheduleTime
  1073. * @param flag
  1074. * @param locations
  1075. * @param connects
  1076. * @return
  1077. */
  1078. public int updateProcessInstance(Integer processInstanceId, String processJson,
  1079. String globalParams, Date scheduleTime, Flag flag,
  1080. String locations, String connects){
  1081. return processInstanceMapper.updateProcessInstance( processInstanceId, processJson,
  1082. globalParams, scheduleTime, locations, connects, flag);
  1083. }
  1084. /**
  1085. * change task state
  1086. * @param state
  1087. * @param endTime
  1088. */
  1089. public void changeTaskState(ExecutionStatus state,
  1090. Date endTime,
  1091. int taskInstId) {
  1092. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1093. taskInstance.setState(state);
  1094. taskInstance.setEndTime(endTime);
  1095. saveTaskInstance(taskInstance);
  1096. }
  1097. /**
  1098. * convert integer list to string list
  1099. * @param intList
  1100. * @return
  1101. */
  1102. public List<String> convertIntListToString(List<Integer> intList){
  1103. if(intList == null){
  1104. return new ArrayList<>();
  1105. }
  1106. List<String> result = new ArrayList<String>(intList.size());
  1107. for(Integer intVar : intList){
  1108. result.add(String.valueOf(intVar));
  1109. }
  1110. return result;
  1111. }
  1112. /**
  1113. * set task
  1114. * 根据任务实例id设置pid
  1115. * @param taskInstId
  1116. * @param pid
  1117. */
  1118. public void updatePidByTaskInstId(int taskInstId, int pid) {
  1119. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1120. taskInstance.setPid(pid);
  1121. taskInstance.setAppLink("");
  1122. saveTaskInstance(taskInstance);
  1123. }
  1124. /**
  1125. * update pid and app links field by task instance id
  1126. * @param taskInstId
  1127. * @param pid
  1128. */
  1129. public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {
  1130. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1131. taskInstance.setPid(pid);
  1132. taskInstance.setAppLink(appLinks);
  1133. saveTaskInstance(taskInstance);
  1134. }
  1135. /**
  1136. * query ProcessDefinition by name
  1137. *
  1138. * @see ProcessDefinition
  1139. */
  1140. public ProcessDefinition findProcessDefineByName(int projectId, String name) {
  1141. ProcessDefinition projectFlow = processDefineMapper.queryByDefineName(projectId, name);
  1142. return projectFlow;
  1143. }
  1144. /**
  1145. * query Schedule <p>
  1146. *
  1147. * @see Schedule
  1148. */
  1149. public Schedule querySchedule(int id) {
  1150. return scheduleMapper.queryById(id);
  1151. }
  1152. public List<ProcessInstance> queryNeedFailoverProcessInstances(String host){
  1153. return processInstanceMapper.queryByHostAndStatus(host, stateArray);
  1154. }
  1155. /**
  1156. * update host null
  1157. * @param host
  1158. * @return
  1159. */
  1160. public int updateNeddFailoverProcessInstances(String host){
  1161. return processInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
  1162. }
  1163. /**
  1164. * process need failover process instance
  1165. * @param processInstance
  1166. */
  1167. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  1168. public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
  1169. //1 update processInstance host is null
  1170. processInstance.setHost("null");
  1171. processInstanceMapper.update(processInstance);
  1172. //2 insert into recover command
  1173. Command cmd = new Command();
  1174. cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
  1175. cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
  1176. cmd.setExecutorId(processInstance.getExecutorId());
  1177. cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
  1178. createCommand(cmd);
  1179. }
  1180. /**
  1181. * query all need failover task instances by host
  1182. * @param host
  1183. * @return
  1184. */
  1185. public List<TaskInstance> queryNeedFailoverTaskInstances(String host){
  1186. return taskInstanceMapper.queryByHostAndStatus(host,stateArray);
  1187. }
  1188. /**
  1189. * update host null
  1190. * @param host
  1191. * @return
  1192. */
  1193. public int updateNeedFailoverTaskInstances(String host){
  1194. return taskInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
  1195. }
  1196. /**
  1197. * find data source by id
  1198. * @param id
  1199. * @return
  1200. */
  1201. public DataSource findDataSourceById(int id){
  1202. return dataSourceMapper.queryById(id);
  1203. }
  1204. /**
  1205. * update process instance state by id
  1206. * @param processInstanceId
  1207. * @param executionStatus
  1208. * @return
  1209. */
  1210. public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
  1211. return processInstanceMapper.updateState(processInstanceId, executionStatus);
  1212. }
  1213. /**
  1214. * find process instance by the task id
  1215. * @param taskId
  1216. * @return
  1217. */
  1218. public ProcessInstance findProcessInstanceByTaskId(int taskId){
  1219. return processInstanceMapper.queryByTaskId(taskId);
  1220. }
  1221. /**
  1222. * find udf function list by id list string
  1223. * @param ids
  1224. * @return
  1225. */
  1226. public List<UdfFunc> queryUdfFunListByids(String ids){
  1227. return udfFuncMapper.queryUdfByIdStr(ids);
  1228. }
  1229. /**
  1230. * find tenant code by resource name
  1231. * @param resName
  1232. * @return
  1233. */
  1234. public String queryTenantCodeByResName(String resName){
  1235. return resourceMapper.queryTenantCodeByResourceName(resName);
  1236. }
  1237. /**
  1238. * find schedule list by process define id.
  1239. * @param ids
  1240. * @return
  1241. */
  1242. public List<Schedule> selectAllByProcessDefineId(int[] ids){
  1243. return scheduleMapper.selectAllByProcessDefineArray(ids);
  1244. }
  1245. /**
  1246. * get dependency cycle by work process define id and scheduler fire time
  1247. *
  1248. * @param masterId
  1249. * @param processDefinitionId
  1250. * @param scheduledFireTime 任务调度预计触发的时间
  1251. * @return
  1252. * @throws Exception
  1253. */
  1254. public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
  1255. List<CycleDependency> list = getCycleDependencies(masterId,new int[]{processDefinitionId},scheduledFireTime);
  1256. return list.size()>0 ? list.get(0) : null;
  1257. }
  1258. /**
  1259. *
  1260. * get dependency cycle list by work process define id list and scheduler fire time
  1261. * @param masterId
  1262. * @param ids
  1263. * @param scheduledFireTime 任务调度预计触发的时间
  1264. * @return
  1265. * @throws Exception
  1266. */
  1267. public List<CycleDependency> getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception {
  1268. List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
  1269. if(ArrayUtils.isEmpty(ids)){
  1270. logger.warn("ids[] is empty!is invalid!");
  1271. return cycleDependencyList;
  1272. }
  1273. if(scheduledFireTime == null){
  1274. logger.warn("scheduledFireTime is null!is invalid!");
  1275. return cycleDependencyList;
  1276. }
  1277. String strCrontab = "";
  1278. CronExpression depCronExpression;
  1279. Cron depCron;
  1280. List<Date> list;
  1281. List<Schedule> schedules = this.selectAllByProcessDefineId(ids);
  1282. // 遍历所有的调度信息
  1283. for(Schedule depSchedule:schedules){
  1284. strCrontab = depSchedule.getCrontab();
  1285. depCronExpression = CronUtils.parse2CronExpression(strCrontab);
  1286. depCron = CronUtils.parse2Cron(strCrontab);
  1287. CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron);
  1288. if(cycleEnum == null){
  1289. logger.error("{} is not valid",strCrontab);
  1290. continue;
  1291. }
  1292. Calendar calendar = Calendar.getInstance();
  1293. switch (cycleEnum){
  1294. /*case MINUTE:
  1295. calendar.add(Calendar.MINUTE,-61);*/
  1296. case HOUR:
  1297. calendar.add(Calendar.HOUR,-25);
  1298. break;
  1299. case DAY:
  1300. calendar.add(Calendar.DATE,-32);
  1301. break;
  1302. case WEEK:
  1303. calendar.add(Calendar.DATE,-32);
  1304. break;
  1305. case MONTH:
  1306. calendar.add(Calendar.MONTH,-13);
  1307. break;
  1308. default:
  1309. logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name());
  1310. continue;
  1311. }
  1312. Date start = calendar.getTime();
  1313. if(depSchedule.getProcessDefinitionId() == masterId){
  1314. list = CronUtils.getSelfFireDateList(start, scheduledFireTime, depCronExpression);
  1315. }else {
  1316. list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
  1317. }
  1318. if(list.size()>=1){
  1319. start = list.get(list.size()-1);
  1320. CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(),start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
  1321. cycleDependencyList.add(dependency);
  1322. }
  1323. }
  1324. return cycleDependencyList;
  1325. }
  1326. /**
  1327. * find process instance by time interval
  1328. * @param defineId
  1329. * @param startTime
  1330. * @param endTime
  1331. * @return
  1332. */
  1333. public ProcessInstance findProcessInstanceByTimeInterval(int defineId, Date startTime, Date endTime, int excludeId) {
  1334. return processInstanceMapper.queryByScheduleTime(defineId, null, excludeId,
  1335. DateUtils.dateToString(startTime), DateUtils.dateToString(endTime));
  1336. }
  1337. public void selfFaultTolerant(int state){
  1338. List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(new int[]{state});
  1339. for (ProcessInstance processInstance:processInstanceList){
  1340. selfFaultTolerant(processInstance);
  1341. }
  1342. }
  1343. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  1344. public void selfFaultTolerant(ProcessInstance processInstance){
  1345. processInstance.setState(ExecutionStatus.FAILURE);
  1346. processInstanceMapper.update(processInstance);
  1347. // insert to command
  1348. Command command = new Command();
  1349. command.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
  1350. command.setProcessDefinitionId(processInstance.getProcessDefinitionId());
  1351. command.setCommandParam(String.format("{\"%s\":%d}",
  1352. CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
  1353. command.setExecutorId(processInstance.getExecutorId());
  1354. command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  1355. createCommand(command);
  1356. }
  1357. /**
  1358. * find last scheduler process instance in the date interval
  1359. * @param definitionId
  1360. * @param dateInterval
  1361. * @return
  1362. */
  1363. public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) {
  1364. return processInstanceMapper.queryLastSchedulerProcess(definitionId,
  1365. DateUtils.dateToString(dateInterval.getStartTime()),
  1366. DateUtils.dateToString(dateInterval.getEndTime()));
  1367. }
  1368. public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) {
  1369. return processInstanceMapper.queryLastManualProcess(definitionId,
  1370. DateUtils.dateToString(dateInterval.getStartTime()),
  1371. DateUtils.dateToString(dateInterval.getEndTime()));
  1372. }
  1373. public ProcessInstance findLastRunningProcess(int definitionId, DateInterval dateInterval) {
  1374. return processInstanceMapper.queryLastRunningProcess(definitionId,
  1375. DateUtils.dateToString(dateInterval.getStartTime()),
  1376. DateUtils.dateToString(dateInterval.getEndTime()),
  1377. stateArray);
  1378. }
  1379. }