ProcessDao.java 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package cn.escheduler.dao;
  18. import cn.escheduler.common.Constants;
  19. import cn.escheduler.common.enums.*;
  20. import cn.escheduler.common.model.DateInterval;
  21. import cn.escheduler.common.model.TaskNode;
  22. import cn.escheduler.common.process.Property;
  23. import cn.escheduler.common.queue.ITaskQueue;
  24. import cn.escheduler.common.queue.TaskQueueFactory;
  25. import cn.escheduler.common.task.subprocess.SubProcessParameters;
  26. import cn.escheduler.common.utils.DateUtils;
  27. import cn.escheduler.common.utils.IpUtils;
  28. import cn.escheduler.common.utils.JSONUtils;
  29. import cn.escheduler.common.utils.ParameterUtils;
  30. import cn.escheduler.dao.mapper.*;
  31. import cn.escheduler.dao.model.*;
  32. import cn.escheduler.dao.utils.cron.CronUtils;
  33. import com.alibaba.fastjson.JSONObject;
  34. import com.cronutils.model.Cron;
  35. import org.apache.commons.lang3.ArrayUtils;
  36. import org.apache.commons.lang3.StringUtils;
  37. import org.quartz.CronExpression;
  38. import org.slf4j.Logger;
  39. import org.slf4j.LoggerFactory;
  40. import org.springframework.beans.factory.annotation.Autowired;
  41. import org.springframework.stereotype.Component;
  42. import org.springframework.transaction.annotation.Transactional;
  43. import java.util.*;
  44. import java.util.stream.Collectors;
  45. import static cn.escheduler.common.Constants.*;
  46. import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper;
  47. /**
  48. * process relative dao that some mappers in this.
  49. */
  50. @Component
  51. public class ProcessDao extends AbstractBaseDao {
  52. private final Logger logger = LoggerFactory.getLogger(getClass());
  53. private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
  54. ExecutionStatus.RUNNING_EXEUTION.ordinal(),
  55. ExecutionStatus.READY_PAUSE.ordinal(),
  56. // ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
  57. ExecutionStatus.READY_STOP.ordinal()};
  58. @Autowired
  59. private UserMapper userMapper;
  60. @Autowired
  61. private ProcessDefinitionMapper processDefineMapper;
  62. @Autowired
  63. private ProcessInstanceMapper processInstanceMapper;
  64. @Autowired
  65. private DataSourceMapper dataSourceMapper;
  66. @Autowired
  67. private ProcessInstanceMapMapper processInstanceMapMapper;
  68. @Autowired
  69. private TaskInstanceMapper taskInstanceMapper;
  70. @Autowired
  71. private CommandMapper commandMapper;
  72. @Autowired
  73. private ScheduleMapper scheduleMapper;
  74. @Autowired
  75. private UdfFuncMapper udfFuncMapper;
  76. @Autowired
  77. private ResourceMapper resourceMapper;
  78. @Autowired
  79. private WorkerGroupMapper workerGroupMapper;
  80. @Autowired
  81. private ErrorCommandMapper errorCommandMapper;
  82. @Autowired
  83. private WorkerServerMapper workerServerMapper;
  84. @Autowired
  85. private TenantMapper tenantMapper;
  86. /**
  87. * task queue impl
  88. */
  89. protected ITaskQueue taskQueue;
  90. public ProcessDao(){
  91. init();
  92. }
  93. /**
  94. * initialize
  95. */
  96. @Override
  97. protected void init() {
  98. userMapper = getMapper(UserMapper.class);
  99. processDefineMapper = getMapper(ProcessDefinitionMapper.class);
  100. processInstanceMapper = getMapper(ProcessInstanceMapper.class);
  101. dataSourceMapper = getMapper(DataSourceMapper.class);
  102. processInstanceMapMapper = getMapper(ProcessInstanceMapMapper.class);
  103. taskInstanceMapper = getMapper(TaskInstanceMapper.class);
  104. commandMapper = getMapper(CommandMapper.class);
  105. scheduleMapper = getMapper(ScheduleMapper.class);
  106. udfFuncMapper = getMapper(UdfFuncMapper.class);
  107. resourceMapper = getMapper(ResourceMapper.class);
  108. workerGroupMapper = getMapper(WorkerGroupMapper.class);
  109. workerServerMapper = getMapper(WorkerServerMapper.class);
  110. taskQueue = TaskQueueFactory.getTaskQueueInstance();
  111. tenantMapper = getMapper(TenantMapper.class);
  112. }
  113. /**
  114. * find one command from command queue, construct process instance
  115. * @param logger
  116. * @param host
  117. * @param validThreadNum
  118. * @return
  119. */
  120. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  121. public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){
  122. ProcessInstance processInstance = null;
  123. Command command = findOneCommand();
  124. if (command == null) {
  125. return null;
  126. }
  127. logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
  128. try{
  129. processInstance = constructProcessInstance(command, host);
  130. //cannot construct process instance, return null;
  131. if(processInstance == null){
  132. logger.error("scan command, command parameter is error: %s", command.toString());
  133. delCommandByid(command.getId());
  134. saveErrorCommand(command, "process instance is null");
  135. return null;
  136. }else if(!checkThreadNum(command, validThreadNum)){
  137. logger.info("there is not enough thread for this command: {}",command.toString() );
  138. return setWaitingThreadProcess(command, processInstance);
  139. }else{
  140. processInstance.setCommandType(command.getCommandType());
  141. processInstance.addHistoryCmd(command.getCommandType());
  142. saveProcessInstance(processInstance);
  143. this.setSubProcessParam(processInstance);
  144. delCommandByid(command.getId());
  145. return processInstance;
  146. }
  147. }catch (Exception e){
  148. logger.error("scan command error ", e);
  149. saveErrorCommand(command, e.toString());
  150. delCommandByid(command.getId());
  151. }
  152. return null;
  153. }
  154. private void saveErrorCommand(Command command, String message) {
  155. ErrorCommand errorCommand = new ErrorCommand(command, message);
  156. this.errorCommandMapper.insert(errorCommand);
  157. }
  158. /**
  159. * set process waiting thread
  160. * @param command
  161. * @param processInstance
  162. * @return
  163. */
  164. private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) {
  165. processInstance.setState(ExecutionStatus.WAITTING_THREAD);
  166. if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){
  167. processInstance.addHistoryCmd(command.getCommandType());
  168. }
  169. saveProcessInstance(processInstance);
  170. this.setSubProcessParam(processInstance);
  171. createRecoveryWaitingThreadCommand(command, processInstance);
  172. return null;
  173. }
  174. private boolean checkThreadNum(Command command, int validThreadNum) {
  175. int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
  176. return validThreadNum >= commandThreadCount;
  177. }
  178. /**
  179. * insert one command
  180. */
  181. public int createCommand(Command command) {
  182. int result = 0;
  183. if (command != null){
  184. result = commandMapper.insert(command);
  185. }
  186. return result;
  187. }
  188. /**
  189. *
  190. * find one command from queue list
  191. * @return
  192. */
  193. public Command findOneCommand(){
  194. return commandMapper.queryOneCommand();
  195. }
  196. /**
  197. * check the input command exists in queue list
  198. * @param command
  199. * @return
  200. */
  201. public Boolean verifyIsNeedCreateCommand(Command command){
  202. Boolean isNeedCreate = true;
  203. Map<CommandType,Integer> cmdTypeMap = new HashMap<CommandType,Integer>();
  204. cmdTypeMap.put(CommandType.REPEAT_RUNNING,1);
  205. cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS,1);
  206. cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS,1);
  207. CommandType commandType = command.getCommandType();
  208. if(cmdTypeMap.containsKey(commandType)){
  209. JSONObject cmdParamObj = (JSONObject) JSONObject.parse(command.getCommandParam());
  210. JSONObject tempObj;
  211. int processInstanceId = cmdParamObj.getInteger(CMDPARAM_RECOVER_PROCESS_ID_STRING);
  212. List<Command> commands = commandMapper.queryAllCommand();
  213. //遍历所有命令
  214. for (Command tmpCommand:commands){
  215. if(cmdTypeMap.containsKey(tmpCommand.getCommandType())){
  216. tempObj = (JSONObject) JSONObject.parse(tmpCommand.getCommandParam());
  217. if(tempObj != null && processInstanceId == tempObj.getInteger(CMDPARAM_RECOVER_PROCESS_ID_STRING)){
  218. isNeedCreate = false;
  219. break;
  220. }
  221. }
  222. }
  223. }
  224. return isNeedCreate;
  225. }
  226. /**
  227. * find process instance detail by id
  228. * @param processId
  229. * @return
  230. */
  231. public ProcessInstance findProcessInstanceDetailById(int processId){
  232. return processInstanceMapper.queryDetailById(processId);
  233. }
  234. /**
  235. * find process instance by id
  236. * @param processId
  237. * @return
  238. */
  239. public ProcessInstance findProcessInstanceById(int processId){
  240. return processInstanceMapper.queryById(processId);
  241. }
  242. /**
  243. * find process instance by scheduler time.
  244. * @param defineId
  245. * @param scheduleTime
  246. * @return
  247. */
  248. public ProcessInstance findProcessInstanceByScheduleTime(int defineId, Date scheduleTime){
  249. return processInstanceMapper.queryByScheduleTime(defineId,
  250. DateUtils.dateToString(scheduleTime), 0, null, null);
  251. }
  252. /**
  253. * find process define by id.
  254. * @param processDefinitionId
  255. * @return
  256. */
  257. public ProcessDefinition findProcessDefineById(int processDefinitionId) {
  258. return processDefineMapper.queryByDefineId(processDefinitionId);
  259. }
  260. /**
  261. * delete work process instance by id
  262. * @param processInstanceId
  263. * @return
  264. */
  265. public int deleteWorkProcessInstanceById(int processInstanceId){
  266. return processInstanceMapper.delete(processInstanceId);
  267. }
  268. /**
  269. *
  270. * delete all sub process by parent instance id
  271. * @return
  272. */
  273. public int deleteAllSubWorkProcessByParentId(int processInstanceId){
  274. List<Integer> subProcessIdList = processInstanceMapper.querySubIdListByParentId(processInstanceId);
  275. for(Integer subId : subProcessIdList ){
  276. deleteAllSubWorkProcessByParentId(subId);
  277. deleteWorkProcessMapByParentId(subId);
  278. deleteWorkProcessInstanceById(subId);
  279. }
  280. return 1;
  281. }
  282. /**
  283. * create process define
  284. * @param processDefinition
  285. * @return
  286. */
  287. public int createProcessDefine(ProcessDefinition processDefinition){
  288. int count = 0;
  289. if(processDefinition != null){
  290. count = this.processDefineMapper.insert(processDefinition);
  291. }
  292. return count;
  293. }
  294. /**
  295. * calculate sub process number in the process define.
  296. * @param processDefinitionId
  297. * @return
  298. */
  299. private Integer workProcessThreadNumCount(Integer processDefinitionId){
  300. List<String> ids = new ArrayList<>();
  301. recurseFindSubProcessId(processDefinitionId, ids);
  302. return ids.size()+1;
  303. }
  304. /**
  305. * recursive query sub process definition id by parent id.
  306. * @param parentId
  307. * @param ids
  308. */
  309. public void recurseFindSubProcessId(int parentId, List<String> ids){
  310. ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(parentId);
  311. String processDefinitionJson = processDefinition.getProcessDefinitionJson();
  312. ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
  313. List<TaskNode> taskNodeList = processData.getTasks();
  314. if (taskNodeList != null && taskNodeList.size() > 0){
  315. for (TaskNode taskNode : taskNodeList){
  316. String parameter = taskNode.getParams();
  317. if (parameter.contains(CMDPARAM_SUB_PROCESS_DEFINE_ID)){
  318. SubProcessParameters subProcessParam = JSONObject.parseObject(parameter, SubProcessParameters.class);
  319. ids.add(String.valueOf(subProcessParam.getProcessDefinitionId()));
  320. recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(),ids);
  321. }
  322. }
  323. }
  324. }
  325. /**
  326. * create recovery waiting thread command when thread pool is not enough for the process instance.
  327. * sub work process instance need not to create recovery command.
  328. * create recovery waiting thread command and delete origin command at the same time.
  329. * if the recovery command is exists, only update the field update_time
  330. * @param originCommand
  331. * @param processInstance
  332. */
  333. public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
  334. // sub process doesnot need to create wait command
  335. if(processInstance.getIsSubProcess() == Flag.YES){
  336. if(originCommand != null){
  337. commandMapper.delete(originCommand.getId());
  338. }
  339. return;
  340. }
  341. Map<String, String> cmdParam = new HashMap<>();
  342. cmdParam.put(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD, String.valueOf(processInstance.getId()));
  343. // process instance quit by "waiting thread" state
  344. if(originCommand == null){
  345. Command command = new Command(
  346. CommandType.RECOVER_WAITTING_THREAD,
  347. processInstance.getTaskDependType(),
  348. processInstance.getFailureStrategy(),
  349. processInstance.getExecutorId(),
  350. processInstance.getProcessDefinitionId(),
  351. JSONUtils.toJson(cmdParam),
  352. processInstance.getWarningType(),
  353. processInstance.getWarningGroupId(),
  354. processInstance.getScheduleTime(),
  355. processInstance.getProcessInstancePriority()
  356. );
  357. saveCommand(command);
  358. return ;
  359. }
  360. // update the command time if current command if recover from waiting
  361. if(originCommand.getCommandType() == CommandType.RECOVER_WAITTING_THREAD){
  362. originCommand.setUpdateTime(new Date());
  363. saveCommand(originCommand);
  364. }else{
  365. // delete old command and create new waiting thread command
  366. commandMapper.delete(originCommand.getId());
  367. originCommand.setId(0);
  368. originCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
  369. originCommand.setUpdateTime(new Date());
  370. originCommand.setCommandParam(JSONUtils.toJson(cmdParam));
  371. originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  372. saveCommand(originCommand);
  373. }
  374. }
  375. /**
  376. * get schedule time from command
  377. * @param command
  378. * @param cmdParam
  379. * @return
  380. */
  381. private Date getScheduleTime(Command command, Map<String, String> cmdParam){
  382. Date scheduleTime = command.getScheduleTime();
  383. if(scheduleTime == null){
  384. if(cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
  385. scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
  386. }
  387. }
  388. return scheduleTime;
  389. }
  390. /**
  391. * generate a new work process instance from command.
  392. * @param processDefinition
  393. * @param command
  394. * @param cmdParam
  395. * @return
  396. */
  397. private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
  398. Command command,
  399. Map<String, String> cmdParam){
  400. ProcessInstance processInstance = new ProcessInstance(processDefinition);
  401. processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
  402. processInstance.setRecovery(Flag.NO);
  403. processInstance.setStartTime(new Date());
  404. processInstance.setRunTimes(1);
  405. processInstance.setMaxTryTimes(0);
  406. processInstance.setProcessDefinitionId(command.getProcessDefinitionId());
  407. processInstance.setCommandParam(command.getCommandParam());
  408. processInstance.setCommandType(command.getCommandType());
  409. processInstance.setIsSubProcess(Flag.NO);
  410. processInstance.setTaskDependType(command.getTaskDependType());
  411. processInstance.setFailureStrategy(command.getFailureStrategy());
  412. processInstance.setExecutorId(command.getExecutorId());
  413. WarningType warningType = command.getWarningType() == null ? WarningType.NONE : command.getWarningType();
  414. processInstance.setWarningType(warningType);
  415. Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
  416. processInstance.setWarningGroupId(warningGroupId);
  417. // schedule time
  418. Date scheduleTime = getScheduleTime(command, cmdParam);
  419. if(scheduleTime != null){
  420. processInstance.setScheduleTime(scheduleTime);
  421. }
  422. processInstance.setCommandStartTime(command.getStartTime());
  423. processInstance.setLocations(processDefinition.getLocations());
  424. processInstance.setConnects(processDefinition.getConnects());
  425. // curing global params
  426. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
  427. processDefinition.getGlobalParamMap(),
  428. processDefinition.getGlobalParamList(),
  429. getCommandTypeIfComplement(processInstance, command),
  430. processInstance.getScheduleTime()));
  431. //copy process define json to process instance
  432. processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
  433. // set process instance priority
  434. processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
  435. int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId();
  436. processInstance.setWorkerGroupId(workerGroupId);
  437. processInstance.setTimeout(processDefinition.getTimeout());
  438. processInstance.setTenantId(processDefinition.getTenantId());
  439. return processInstance;
  440. }
  441. /**
  442. * get process tenant
  443. * there is tenant id in definition, use the tenant of the definition.
  444. * if there is not tenant id in the definiton or the tenant not exist
  445. * use definition creator's tenant.
  446. * @param tenantId
  447. * @param userId
  448. * @return
  449. */
  450. public Tenant getTenantForProcess(int tenantId, int userId){
  451. Tenant tenant = null;
  452. if(tenantId >= 0){
  453. tenant = tenantMapper.queryById(tenantId);
  454. }
  455. if(tenant == null){
  456. User user = userMapper.queryById(userId);
  457. tenant = tenantMapper.queryById(user.getTenantId());
  458. }
  459. return tenant;
  460. }
  461. /**
  462. * check command parameters is valid
  463. * @param command
  464. * @param cmdParam
  465. * @return
  466. */
  467. private Boolean checkCmdParam(Command command, Map<String, String> cmdParam){
  468. if(command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType()== TaskDependType.TASK_PRE){
  469. if(cmdParam == null
  470. || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES)
  471. || cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()){
  472. logger.error(String.format("command node depend type is %s, but start nodes is null ", command.getTaskDependType().toString()));
  473. return false;
  474. }
  475. }
  476. return true;
  477. }
  478. /**
  479. * construct process instance according to one command.
  480. * @param command
  481. * @param host
  482. * @return
  483. */
  484. private ProcessInstance constructProcessInstance(Command command, String host){
  485. ProcessInstance processInstance = null;
  486. CommandType commandType = command.getCommandType();
  487. Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
  488. ProcessDefinition processDefinition = null;
  489. if(command.getProcessDefinitionId() != 0){
  490. processDefinition = processDefineMapper.queryByDefineId(command.getProcessDefinitionId());
  491. if(processDefinition == null){
  492. logger.error(String.format("cannot find the work process define! define id : %d", command.getProcessDefinitionId()));
  493. return null;
  494. }
  495. }
  496. if(cmdParam != null ){
  497. Integer processInstanceId = 0;
  498. // recover from failure or pause tasks
  499. if(cmdParam.containsKey(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING)) {
  500. String processId = cmdParam.get(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING);
  501. processInstanceId = Integer.parseInt(processId);
  502. if (processInstanceId == 0) {
  503. logger.error("command parameter is error, [ ProcessInstanceId ] is 0");
  504. return null;
  505. }
  506. }else if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
  507. // sub process map
  508. String pId = cmdParam.get(Constants.CMDPARAM_SUB_PROCESS);
  509. processInstanceId = Integer.parseInt(pId);
  510. }else if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD)){
  511. // waiting thread command
  512. String pId = cmdParam.get(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD);
  513. processInstanceId = Integer.parseInt(pId);
  514. }
  515. if(processInstanceId ==0){
  516. processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
  517. }else{
  518. processInstance = this.findProcessInstanceDetailById(processInstanceId);
  519. }
  520. processDefinition = processDefineMapper.queryByDefineId(processInstance.getProcessDefinitionId());
  521. processInstance.setProcessDefinition(processDefinition);
  522. //reset command parameter
  523. if(processInstance.getCommandParam() != null){
  524. Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
  525. for(String key : processCmdParam.keySet()){
  526. if(!cmdParam.containsKey(key)){
  527. cmdParam.put(key,processCmdParam.get(key));
  528. }
  529. }
  530. }
  531. // reset command parameter if sub process
  532. if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
  533. processInstance.setCommandParam(command.getCommandParam());
  534. }
  535. }else{
  536. // generate one new process instance
  537. processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
  538. }
  539. if(!checkCmdParam(command, cmdParam)){
  540. logger.error("command parameter check failed!");
  541. return null;
  542. }
  543. if(command.getScheduleTime() != null){
  544. processInstance.setScheduleTime(command.getScheduleTime());
  545. }
  546. processInstance.setHost(host);
  547. ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXEUTION;
  548. int runTime = processInstance.getRunTimes();
  549. switch (commandType){
  550. case START_PROCESS:
  551. break;
  552. case START_FAILURE_TASK_PROCESS:
  553. // find failed tasks and init these tasks
  554. List<Integer> failedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
  555. List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE);
  556. List<Integer> killedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
  557. cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
  558. failedList.addAll(killedList);
  559. failedList.addAll(toleranceList);
  560. for(Integer taskId : failedList){
  561. initTaskInstance(this.findTaskInstanceById(taskId));
  562. }
  563. cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
  564. String.join(Constants.COMMA, convertIntListToString(failedList)));
  565. processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
  566. processInstance.setRunTimes(runTime +1 );
  567. break;
  568. case START_CURRENT_TASK_PROCESS:
  569. break;
  570. case RECOVER_WAITTING_THREAD:
  571. break;
  572. case RECOVER_SUSPENDED_PROCESS:
  573. // find pause tasks and init task's state
  574. cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
  575. List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
  576. List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
  577. ExecutionStatus.KILL);
  578. suspendedNodeList.addAll(stopNodeList);
  579. for(Integer taskId : suspendedNodeList){
  580. // 把暂停状态初始化
  581. initTaskInstance(this.findTaskInstanceById(taskId));
  582. }
  583. cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(suspendedNodeList)));
  584. processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
  585. processInstance.setRunTimes(runTime +1);
  586. break;
  587. case RECOVER_TOLERANCE_FAULT_PROCESS:
  588. // recover tolerance fault process
  589. processInstance.setRecovery(Flag.YES);
  590. runStatus = processInstance.getState();
  591. break;
  592. case COMPLEMENT_DATA:
  593. // delete all the valid tasks when complement data
  594. List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
  595. for(TaskInstance taskInstance : taskInstanceList){
  596. taskInstance.setFlag(Flag.NO);
  597. this.updateTaskInstance(taskInstance);
  598. }
  599. break;
  600. case REPEAT_RUNNING:
  601. // delete the recover task names from command parameter
  602. if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
  603. cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
  604. processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
  605. }
  606. // delete all the valid tasks when repeat running
  607. List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
  608. for(TaskInstance taskInstance : validTaskList){
  609. taskInstance.setFlag(Flag.NO);
  610. updateTaskInstance(taskInstance);
  611. }
  612. processInstance.setStartTime(new Date());
  613. processInstance.setEndTime(null);
  614. processInstance.setRunTimes(runTime +1);
  615. initComplementDataParam(processDefinition, processInstance, cmdParam);
  616. break;
  617. case SCHEDULER:
  618. break;
  619. default:
  620. break;
  621. }
  622. processInstance.setState(runStatus);
  623. return processInstance;
  624. }
  625. /**
  626. * return complement data if the process start with complement data
  627. */
  628. private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command){
  629. if(CommandType.COMPLEMENT_DATA == processInstance.getCmdTypeIfComplement()){
  630. return CommandType.COMPLEMENT_DATA;
  631. }else{
  632. return command.getCommandType();
  633. }
  634. }
  635. /**
  636. * initialize complement data parameters
  637. * @param processDefinition
  638. * @param processInstance
  639. * @param cmdParam
  640. */
  641. private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, Map<String, String> cmdParam) {
  642. if(!processInstance.isComplementData()){
  643. return;
  644. }
  645. Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
  646. YYYY_MM_DD_HH_MM_SS);
  647. processInstance.setScheduleTime(startComplementTime);
  648. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
  649. processDefinition.getGlobalParamMap(),
  650. processDefinition.getGlobalParamList(),
  651. CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
  652. }
  653. /**
  654. * set sub work process parameters.
  655. * handle sub work process instance, update relation table and command parameters
  656. * set sub work process flag, extends parent work process command parameters.
  657. */
  658. public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance){
  659. String cmdParam = subProcessInstance.getCommandParam();
  660. if(StringUtils.isEmpty(cmdParam)){
  661. return subProcessInstance;
  662. }
  663. Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
  664. // write sub process id into cmd param.
  665. if(paramMap.containsKey(CMDPARAM_SUB_PROCESS)
  666. && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){
  667. paramMap.remove(CMDPARAM_SUB_PROCESS);
  668. paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
  669. subProcessInstance.setCommandParam(JSONUtils.toJson(paramMap));
  670. subProcessInstance.setIsSubProcess(Flag.YES);
  671. this.saveProcessInstance(subProcessInstance);
  672. }
  673. // copy parent instance user def params to sub process..
  674. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
  675. if(StringUtils.isNotEmpty(parentInstanceId)){
  676. ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
  677. if(parentInstance != null){
  678. subProcessInstance.setGlobalParams(
  679. joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
  680. this.saveProcessInstance(subProcessInstance);
  681. }else{
  682. logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
  683. }
  684. }
  685. ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
  686. if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){
  687. return subProcessInstance;
  688. }
  689. // update sub process id to process map table
  690. processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
  691. this.updateWorkProcessInstanceMap(processInstanceMap);
  692. return subProcessInstance;
  693. }
  694. /**
  695. * join parent global params into sub process.
  696. * only the keys doesn't in sub process global would be joined.
  697. * @param parentGlobalParams
  698. * @param subGlobalParams
  699. * @return
  700. */
  701. private String joinGlobalParams(String parentGlobalParams, String subGlobalParams){
  702. List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class);
  703. List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class);
  704. Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
  705. for(Property parent : parentPropertyList){
  706. if(!subMap.containsKey(parent.getProp())){
  707. subPropertyList.add(parent);
  708. }
  709. }
  710. return JSONUtils.toJson(subPropertyList);
  711. }
  712. /**
  713. * initialize task instance
  714. * @param taskInstance
  715. */
  716. private void initTaskInstance(TaskInstance taskInstance){
  717. if(!taskInstance.isSubProcess()){
  718. if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){
  719. taskInstance.setFlag(Flag.NO);
  720. updateTaskInstance(taskInstance);
  721. return;
  722. }
  723. }
  724. taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
  725. updateTaskInstance(taskInstance);
  726. }
  727. /**
  728. * submit task to mysql and task queue
  729. * submit sub process to command
  730. * @param taskInstance
  731. * @return
  732. */
  733. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  734. public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
  735. logger.info("start submit task : {}, instance id:{}, state: {}, ",
  736. taskInstance.getName(), processInstance.getId(), processInstance.getState() );
  737. processInstance = this.findProcessInstanceDetailById(processInstance.getId());
  738. //submit to mysql
  739. TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
  740. if(task.isSubProcess() && !task.getState().typeIsFinished()){
  741. ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
  742. TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
  743. Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
  744. Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
  745. createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
  746. }else if(!task.getState().typeIsFinished()){
  747. //submit to task queue
  748. task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  749. submitTaskToQueue(task);
  750. }
  751. logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",
  752. taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
  753. return task;
  754. }
  755. /**
  756. * set work process instance map
  757. * @param parentInstance
  758. * @param parentTask
  759. * @return
  760. */
  761. private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){
  762. ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
  763. if(processMap != null){
  764. return processMap;
  765. }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING
  766. || parentInstance.isComplementData()){
  767. // update current task id to map
  768. // repeat running does not generate new sub process instance
  769. processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
  770. if(processMap!= null){
  771. processMap.setParentTaskInstanceId(parentTask.getId());
  772. updateWorkProcessInstanceMap(processMap);
  773. return processMap;
  774. }
  775. }
  776. // new task
  777. processMap = new ProcessInstanceMap();
  778. processMap.setParentProcessInstanceId(parentInstance.getId());
  779. processMap.setParentTaskInstanceId(parentTask.getId());
  780. createWorkProcessInstanceMap(processMap);
  781. return processMap;
  782. }
  783. /**
  784. * find previous task work process map.
  785. * @param parentProcessInstance
  786. * @param parentTask
  787. * @return
  788. */
  789. private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
  790. TaskInstance parentTask) {
  791. Integer preTaskId = 0;
  792. List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
  793. for(TaskInstance task : preTaskList){
  794. if(task.getName().equals(parentTask.getName())){
  795. preTaskId = task.getId();
  796. ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
  797. if(map!=null){
  798. return map;
  799. }
  800. }
  801. }
  802. logger.info("sub process instance is not found,parent task:{},parent instance:{}",
  803. parentTask.getId(), parentProcessInstance.getId());
  804. return null;
  805. }
  806. /**
  807. * create sub work process command
  808. * @param parentProcessInstance
  809. * @param instanceMap
  810. * @param childDefineId
  811. * @param task
  812. */
  813. private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
  814. ProcessInstanceMap instanceMap,
  815. Integer childDefineId, TaskInstance task){
  816. ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
  817. CommandType fatherType = parentProcessInstance.getCommandType();
  818. CommandType commandType = fatherType;
  819. if(childInstance == null || commandType == CommandType.REPEAT_RUNNING){
  820. String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
  821. // sub process must begin with schedule/complement data
  822. // if father begin with scheduler/complement data
  823. if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
  824. fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
  825. commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
  826. }
  827. }
  828. if(childInstance != null){
  829. childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
  830. updateProcessInstance(childInstance);
  831. }
  832. // set sub work process command
  833. String processMapStr = JSONUtils.toJson(instanceMap);
  834. Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
  835. if(commandType == CommandType.COMPLEMENT_DATA ||
  836. (childInstance != null && childInstance.isComplementData())){
  837. Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
  838. String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
  839. String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
  840. cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
  841. cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
  842. processMapStr = JSONUtils.toJson(cmdParam);
  843. }
  844. Command command = new Command();
  845. command.setWarningType(parentProcessInstance.getWarningType());
  846. command.setWarningGroupId(parentProcessInstance.getWarningGroupId());
  847. command.setFailureStrategy(parentProcessInstance.getFailureStrategy());
  848. command.setProcessDefinitionId(childDefineId);
  849. command.setScheduleTime(parentProcessInstance.getScheduleTime());
  850. command.setExecutorId(parentProcessInstance.getExecutorId());
  851. command.setCommandParam(processMapStr);
  852. command.setCommandType(commandType);
  853. command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
  854. createCommand(command);
  855. logger.info("sub process command created: {} ", command.toString());
  856. }
  857. /**
  858. * submit task to mysql
  859. * @param taskInstance
  860. * @return
  861. */
  862. public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){
  863. ExecutionStatus processInstanceState = processInstance.getState();
  864. if(taskInstance.getState().typeIsFailure()){
  865. if(taskInstance.isSubProcess()){
  866. taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
  867. }else {
  868. if( processInstanceState != ExecutionStatus.READY_STOP
  869. && processInstanceState != ExecutionStatus.READY_PAUSE){
  870. // failure task set invalid
  871. taskInstance.setFlag(Flag.NO);
  872. updateTaskInstance(taskInstance);
  873. // crate new task instance
  874. if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){
  875. taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
  876. }
  877. taskInstance.setEndTime(null);
  878. taskInstance.setStartTime(new Date());
  879. taskInstance.setFlag(Flag.YES);
  880. taskInstance.setHost(null);
  881. taskInstance.setId(0);
  882. }
  883. }
  884. }
  885. taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  886. taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
  887. taskInstance.setSubmitTime(new Date());
  888. saveTaskInstance(taskInstance);
  889. return taskInstance;
  890. }
  891. /**
  892. * submit task to queue
  893. * @param task
  894. */
  895. public Boolean submitTaskToQueue(TaskInstance task) {
  896. try{
  897. // task cannot submit when running
  898. if(task.getState() == ExecutionStatus.RUNNING_EXEUTION){
  899. logger.info(String.format("submit to task queue, but task [%s] state already be running. ", task.getName()));
  900. return true;
  901. }
  902. if(checkTaskExistsInTaskQueue(task)){
  903. logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", task.getName()));
  904. return true;
  905. }
  906. logger.info("task ready to queue: {}" , task);
  907. taskQueue.add(SCHEDULER_TASKS_QUEUE, taskZkInfo(task));
  908. logger.info(String.format("master insert into queue success, task : %s", task.getName()) );
  909. return true;
  910. }catch (Exception e){
  911. logger.error("submit task to queue Exception: ", e);
  912. logger.error("task queue error : %s", JSONUtils.toJson(task));
  913. return false;
  914. }
  915. }
  916. /**
  917. * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${task executed by ip1},${ip2}...
  918. *
  919. * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
  920. *
  921. * 流程实例优先级_流程实例id_任务优先级_任务id_任务执行机器ip1,ip2... high <- low
  922. *
  923. * @param taskInstance
  924. * @return
  925. */
  926. private String taskZkInfo(TaskInstance taskInstance) {
  927. int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
  928. StringBuilder sb = new StringBuilder(100);
  929. sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
  930. .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
  931. .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
  932. .append(taskInstance.getId()).append(Constants.UNDERLINE);
  933. if(taskWorkerGroupId > 0){
  934. //not to find data from db
  935. WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
  936. if(workerGroup == null ){
  937. logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
  938. sb.append(Constants.DEFAULT_WORKER_ID);
  939. return sb.toString();
  940. }
  941. String ips = workerGroup.getIpList();
  942. if(StringUtils.isBlank(ips)){
  943. logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
  944. taskInstance.getId(), workerGroup.getId());
  945. sb.append(Constants.DEFAULT_WORKER_ID);
  946. return sb.toString();
  947. }
  948. StringBuilder ipSb = new StringBuilder(100);
  949. String[] ipArray = ips.split(COMMA);
  950. for (String ip : ipArray) {
  951. long ipLong = IpUtils.ipToLong(ip);
  952. ipSb.append(ipLong).append(COMMA);
  953. }
  954. if(ipSb.length() > 0) {
  955. ipSb.deleteCharAt(ipSb.length() - 1);
  956. }
  957. sb.append(ipSb);
  958. }else{
  959. sb.append(Constants.DEFAULT_WORKER_ID);
  960. }
  961. return sb.toString();
  962. }
  963. /**
  964. * get submit task instance state by the work process state
  965. * cannot modify the task state when running/kill/submit success, or this
  966. * task instance is already exists in task queue .
  967. * return pause if work process state is ready pause
  968. * return stop if work process state is ready stop
  969. * if all of above are not satisfied, return submit success
  970. *
  971. * @param taskInstance
  972. * @param processInstanceState
  973. * @return
  974. */
  975. public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){
  976. ExecutionStatus state = taskInstance.getState();
  977. if(
  978. // running or killed
  979. // the task already exists in task queue
  980. // return state
  981. state == ExecutionStatus.RUNNING_EXEUTION
  982. || state == ExecutionStatus.KILL
  983. || checkTaskExistsInTaskQueue(taskInstance)
  984. ){
  985. return state;
  986. }
  987. //return pasue /stop if process instance state is ready pause / stop
  988. // or return submit success
  989. if( processInstanceState == ExecutionStatus.READY_PAUSE){
  990. state = ExecutionStatus.PAUSE;
  991. }else if(processInstanceState == ExecutionStatus.READY_STOP) {
  992. state = ExecutionStatus.KILL;
  993. }else{
  994. state = ExecutionStatus.SUBMITTED_SUCCESS;
  995. }
  996. return state;
  997. }
  998. /**
  999. * check the task instance existing in queue
  1000. * @return
  1001. */
  1002. public boolean checkTaskExistsInTaskQueue(TaskInstance task){
  1003. if(task.isSubProcess()){
  1004. return false;
  1005. }
  1006. String taskZkInfo = taskZkInfo(task);
  1007. return taskQueue.checkTaskExists(SCHEDULER_TASKS_QUEUE, taskZkInfo);
  1008. }
  1009. /**
  1010. * create a new process instance
  1011. * @param processInstance
  1012. */
  1013. public void createProcessInstance(ProcessInstance processInstance){
  1014. if (processInstance != null){
  1015. processInstanceMapper.insert(processInstance);
  1016. }
  1017. }
  1018. /**
  1019. * insert or update work process instance to data base
  1020. * @param workProcessInstance
  1021. */
  1022. public void saveProcessInstance(ProcessInstance workProcessInstance){
  1023. if (workProcessInstance == null){
  1024. logger.error("save error, process instance is null!");
  1025. return ;
  1026. }
  1027. //创建流程实例
  1028. if(workProcessInstance.getId() != 0){
  1029. processInstanceMapper.update(workProcessInstance);
  1030. }else{
  1031. createProcessInstance(workProcessInstance);
  1032. }
  1033. }
  1034. /**
  1035. * insert or update command
  1036. * @param command
  1037. * @return
  1038. */
  1039. public int saveCommand(Command command){
  1040. if(command.getId() != 0){
  1041. return commandMapper.update(command);
  1042. }else{
  1043. return commandMapper.insert(command);
  1044. }
  1045. }
  1046. /**
  1047. * insert or update task instance
  1048. * @param taskInstance
  1049. * @return
  1050. */
  1051. public boolean saveTaskInstance(TaskInstance taskInstance){
  1052. if(taskInstance.getId() != 0){
  1053. return updateTaskInstance(taskInstance);
  1054. }else{
  1055. return createTaskInstance(taskInstance);
  1056. }
  1057. }
  1058. /**
  1059. * insert task instance
  1060. * @param taskInstance
  1061. * @return
  1062. */
  1063. public boolean createTaskInstance(TaskInstance taskInstance) {
  1064. int count = taskInstanceMapper.insert(taskInstance);
  1065. return count > 0;
  1066. }
  1067. /**
  1068. * update task instance
  1069. * @param taskInstance
  1070. * @return
  1071. */
  1072. public boolean updateTaskInstance(TaskInstance taskInstance){
  1073. int count = taskInstanceMapper.update(taskInstance);
  1074. return count > 0;
  1075. }
  1076. /**
  1077. * delete a command by id
  1078. * @param id
  1079. */
  1080. public void delCommandByid(int id) {
  1081. commandMapper.delete(id);
  1082. }
  1083. public TaskInstance findTaskInstanceById(Integer taskId){
  1084. return taskInstanceMapper.queryById(taskId);
  1085. }
  1086. /**
  1087. * get id list by task state
  1088. * @param instanceId
  1089. * @param state
  1090. * @return
  1091. */
  1092. public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state){
  1093. return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
  1094. }
  1095. /**
  1096. *
  1097. * find valid task list by process definition id
  1098. * @param processInstanceId
  1099. * @return
  1100. */
  1101. public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId){
  1102. return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
  1103. }
  1104. /**
  1105. * find previous task list by work process id
  1106. * @param workProcessInstanceId
  1107. * @return
  1108. */
  1109. public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer workProcessInstanceId){
  1110. return taskInstanceMapper.findValidTaskListByProcessId(workProcessInstanceId, Flag.NO);
  1111. }
  1112. /**
  1113. * update work process instance map
  1114. * @param processInstanceMap
  1115. * @return
  1116. */
  1117. public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
  1118. return processInstanceMapMapper.update(processInstanceMap);
  1119. }
  1120. /**
  1121. * create work process instance map
  1122. * @param processInstanceMap
  1123. * @return
  1124. */
  1125. public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
  1126. Integer count = 0;
  1127. if(processInstanceMap !=null){
  1128. return processInstanceMapMapper.insert(processInstanceMap);
  1129. }
  1130. return count;
  1131. }
  1132. /**
  1133. * find work process map by parent process id and parent task id.
  1134. * @param parentWorkProcessId
  1135. * @param parentTaskId
  1136. * @return
  1137. */
  1138. public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId){
  1139. return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
  1140. }
  1141. /**
  1142. * delete work process map by parent process id
  1143. * @param parentWorkProcessId
  1144. * @return
  1145. */
  1146. public int deleteWorkProcessMapByParentId(int parentWorkProcessId){
  1147. return processInstanceMapMapper.deleteByParentProcessId(parentWorkProcessId);
  1148. }
  1149. public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId){
  1150. ProcessInstance processInstance = null;
  1151. ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryByParentId(parentProcessId, parentTaskId);
  1152. if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
  1153. return processInstance;
  1154. }
  1155. processInstance = findProcessInstanceById(processInstanceMap.getProcessInstanceId());
  1156. return processInstance;
  1157. }
  1158. public ProcessInstance findParentProcessInstance(Integer subProcessId) {
  1159. ProcessInstance processInstance = null;
  1160. ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(subProcessId);
  1161. if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
  1162. return processInstance;
  1163. }
  1164. processInstance = findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
  1165. return processInstance;
  1166. }
  1167. /**
  1168. * change task state
  1169. * @param state
  1170. * @param startTime
  1171. * @param host
  1172. * @param executePath
  1173. */
  1174. public void changeTaskState(ExecutionStatus state, Date startTime, String host,
  1175. String executePath,
  1176. String logPath,
  1177. int taskInstId) {
  1178. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1179. taskInstance.setState(state);
  1180. taskInstance.setStartTime(startTime);
  1181. taskInstance.setHost(host);
  1182. taskInstance.setExecutePath(executePath);
  1183. taskInstance.setLogPath(logPath);
  1184. saveTaskInstance(taskInstance);
  1185. }
  1186. /**
  1187. * update process instance
  1188. * @param instance
  1189. * @return
  1190. */
  1191. public int updateProcessInstance(ProcessInstance instance){
  1192. return processInstanceMapper.update(instance);
  1193. }
  1194. /**
  1195. * update the process instance
  1196. * @param processInstanceId
  1197. * @param processJson
  1198. * @param globalParams
  1199. * @param scheduleTime
  1200. * @param flag
  1201. * @param locations
  1202. * @param connects
  1203. * @return
  1204. */
  1205. public int updateProcessInstance(Integer processInstanceId, String processJson,
  1206. String globalParams, Date scheduleTime, Flag flag,
  1207. String locations, String connects){
  1208. return processInstanceMapper.updateProcessInstance(processInstanceId, processJson,
  1209. globalParams, scheduleTime, locations, connects, flag);
  1210. }
  1211. /**
  1212. * change task state
  1213. * @param state
  1214. * @param endTime
  1215. */
  1216. public void changeTaskState(ExecutionStatus state,
  1217. Date endTime,
  1218. int taskInstId) {
  1219. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1220. taskInstance.setState(state);
  1221. taskInstance.setEndTime(endTime);
  1222. saveTaskInstance(taskInstance);
  1223. }
  1224. /**
  1225. * convert integer list to string list
  1226. * @param intList
  1227. * @return
  1228. */
  1229. public List<String> convertIntListToString(List<Integer> intList){
  1230. if(intList == null){
  1231. return new ArrayList<>();
  1232. }
  1233. List<String> result = new ArrayList<String>(intList.size());
  1234. for(Integer intVar : intList){
  1235. result.add(String.valueOf(intVar));
  1236. }
  1237. return result;
  1238. }
  1239. /**
  1240. * set task
  1241. * 根据任务实例id设置pid
  1242. * @param taskInstId
  1243. * @param pid
  1244. */
  1245. public void updatePidByTaskInstId(int taskInstId, int pid) {
  1246. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1247. taskInstance.setPid(pid);
  1248. taskInstance.setAppLink("");
  1249. saveTaskInstance(taskInstance);
  1250. }
  1251. /**
  1252. * update pid and app links field by task instance id
  1253. * @param taskInstId
  1254. * @param pid
  1255. */
  1256. public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {
  1257. TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
  1258. taskInstance.setPid(pid);
  1259. taskInstance.setAppLink(appLinks);
  1260. saveTaskInstance(taskInstance);
  1261. }
  1262. /**
  1263. * query ProcessDefinition by name
  1264. *
  1265. * @see ProcessDefinition
  1266. */
  1267. public ProcessDefinition findProcessDefineByName(int projectId, String name) {
  1268. ProcessDefinition projectFlow = processDefineMapper.queryByDefineName(projectId, name);
  1269. return projectFlow;
  1270. }
  1271. /**
  1272. * query Schedule <p>
  1273. *
  1274. * @see Schedule
  1275. */
  1276. public Schedule querySchedule(int id) {
  1277. return scheduleMapper.queryById(id);
  1278. }
  1279. public List<ProcessInstance> queryNeedFailoverProcessInstances(String host){
  1280. return processInstanceMapper.queryByHostAndStatus(host, stateArray);
  1281. }
  1282. /**
  1283. * update host null
  1284. * @param host
  1285. * @return
  1286. */
  1287. public int updateNeddFailoverProcessInstances(String host){
  1288. return processInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
  1289. }
  1290. /**
  1291. * process need failover process instance
  1292. * @param processInstance
  1293. */
  1294. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  1295. public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
  1296. //1 update processInstance host is null
  1297. processInstance.setHost("null");
  1298. processInstanceMapper.update(processInstance);
  1299. //2 insert into recover command
  1300. Command cmd = new Command();
  1301. cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
  1302. cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
  1303. cmd.setExecutorId(processInstance.getExecutorId());
  1304. cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
  1305. createCommand(cmd);
  1306. }
  1307. /**
  1308. * query all need failover task instances by host
  1309. * @param host
  1310. * @return
  1311. */
  1312. public List<TaskInstance> queryNeedFailoverTaskInstances(String host){
  1313. return taskInstanceMapper.queryByHostAndStatus(host, stateArray);
  1314. }
  1315. /**
  1316. * update host null
  1317. * @param host
  1318. * @return
  1319. */
  1320. public int updateNeedFailoverTaskInstances(String host){
  1321. return taskInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
  1322. }
  1323. /**
  1324. * find data source by id
  1325. * @param id
  1326. * @return
  1327. */
  1328. public DataSource findDataSourceById(int id){
  1329. return dataSourceMapper.queryById(id);
  1330. }
  1331. /**
  1332. * update process instance state by id
  1333. * @param processInstanceId
  1334. * @param executionStatus
  1335. * @return
  1336. */
  1337. public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
  1338. return processInstanceMapper.updateState(processInstanceId, executionStatus);
  1339. }
  1340. /**
  1341. * find process instance by the task id
  1342. * @param taskId
  1343. * @return
  1344. */
  1345. public ProcessInstance findProcessInstanceByTaskId(int taskId){
  1346. return processInstanceMapper.queryByTaskId(taskId);
  1347. }
  1348. /**
  1349. * find udf function list by id list string
  1350. * @param ids
  1351. * @return
  1352. */
  1353. public List<UdfFunc> queryUdfFunListByids(String ids){
  1354. return udfFuncMapper.queryUdfByIdStr(ids);
  1355. }
  1356. /**
  1357. * find tenant code by resource name
  1358. * @param resName
  1359. * @return
  1360. */
  1361. public String queryTenantCodeByResName(String resName){
  1362. return resourceMapper.queryTenantCodeByResourceName(resName);
  1363. }
  1364. /**
  1365. * find schedule list by process define id.
  1366. * @param ids
  1367. * @return
  1368. */
  1369. public List<Schedule> selectAllByProcessDefineId(int[] ids){
  1370. return scheduleMapper.selectAllByProcessDefineArray(ids);
  1371. }
  1372. /**
  1373. * get dependency cycle by work process define id and scheduler fire time
  1374. *
  1375. * @param masterId
  1376. * @param processDefinitionId
  1377. * @param scheduledFireTime 任务调度预计触发的时间
  1378. * @return
  1379. * @throws Exception
  1380. */
  1381. public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
  1382. List<CycleDependency> list = getCycleDependencies(masterId,new int[]{processDefinitionId},scheduledFireTime);
  1383. return list.size()>0 ? list.get(0) : null;
  1384. }
  1385. /**
  1386. *
  1387. * get dependency cycle list by work process define id list and scheduler fire time
  1388. * @param masterId
  1389. * @param ids
  1390. * @param scheduledFireTime 任务调度预计触发的时间
  1391. * @return
  1392. * @throws Exception
  1393. */
  1394. public List<CycleDependency> getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception {
  1395. List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
  1396. if(ArrayUtils.isEmpty(ids)){
  1397. logger.warn("ids[] is empty!is invalid!");
  1398. return cycleDependencyList;
  1399. }
  1400. if(scheduledFireTime == null){
  1401. logger.warn("scheduledFireTime is null!is invalid!");
  1402. return cycleDependencyList;
  1403. }
  1404. String strCrontab = "";
  1405. CronExpression depCronExpression;
  1406. Cron depCron;
  1407. List<Date> list;
  1408. List<Schedule> schedules = this.selectAllByProcessDefineId(ids);
  1409. // 遍历所有的调度信息
  1410. for(Schedule depSchedule:schedules){
  1411. strCrontab = depSchedule.getCrontab();
  1412. depCronExpression = CronUtils.parse2CronExpression(strCrontab);
  1413. depCron = CronUtils.parse2Cron(strCrontab);
  1414. CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron);
  1415. if(cycleEnum == null){
  1416. logger.error("{} is not valid",strCrontab);
  1417. continue;
  1418. }
  1419. Calendar calendar = Calendar.getInstance();
  1420. switch (cycleEnum){
  1421. /*case MINUTE:
  1422. calendar.add(Calendar.MINUTE,-61);*/
  1423. case HOUR:
  1424. calendar.add(Calendar.HOUR,-25);
  1425. break;
  1426. case DAY:
  1427. calendar.add(Calendar.DATE,-32);
  1428. break;
  1429. case WEEK:
  1430. calendar.add(Calendar.DATE,-32);
  1431. break;
  1432. case MONTH:
  1433. calendar.add(Calendar.MONTH,-13);
  1434. break;
  1435. default:
  1436. logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name());
  1437. continue;
  1438. }
  1439. Date start = calendar.getTime();
  1440. if(depSchedule.getProcessDefinitionId() == masterId){
  1441. list = CronUtils.getSelfFireDateList(start, scheduledFireTime, depCronExpression);
  1442. }else {
  1443. list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
  1444. }
  1445. if(list.size()>=1){
  1446. start = list.get(list.size()-1);
  1447. CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(),start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
  1448. cycleDependencyList.add(dependency);
  1449. }
  1450. }
  1451. return cycleDependencyList;
  1452. }
  1453. /**
  1454. * find process instance by time interval
  1455. * @param defineId
  1456. * @param startTime
  1457. * @param endTime
  1458. * @return
  1459. */
  1460. public ProcessInstance findProcessInstanceByTimeInterval(int defineId, Date startTime, Date endTime, int excludeId) {
  1461. return processInstanceMapper.queryByScheduleTime(defineId, null, excludeId,
  1462. DateUtils.dateToString(startTime), DateUtils.dateToString(endTime));
  1463. }
  1464. public void selfFaultTolerant(int state){
  1465. List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(new int[]{state});
  1466. for (ProcessInstance processInstance:processInstanceList){
  1467. selfFaultTolerant(processInstance);
  1468. }
  1469. }
  1470. /**
  1471. * master starup fault tolerant
  1472. */
  1473. public void masterStartupFaultTolerant(){
  1474. int[] readyStopAndKill=new int[]{ExecutionStatus.READY_PAUSE.ordinal(),ExecutionStatus.READY_STOP.ordinal(),
  1475. ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),ExecutionStatus.RUNNING_EXEUTION.ordinal()};
  1476. List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(readyStopAndKill);
  1477. for (ProcessInstance processInstance:processInstanceList){
  1478. processNeedFailoverProcessInstances(processInstance);
  1479. }
  1480. }
  1481. @Transactional(value = "TransactionManager",rollbackFor = Exception.class)
  1482. public void selfFaultTolerant(ProcessInstance processInstance){
  1483. processInstance.setState(ExecutionStatus.FAILURE);
  1484. processInstanceMapper.update(processInstance);
  1485. // insert to command
  1486. Command command = new Command();
  1487. command.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
  1488. command.setProcessDefinitionId(processInstance.getProcessDefinitionId());
  1489. command.setCommandParam(String.format("{\"%s\":%d}",
  1490. CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
  1491. command.setExecutorId(processInstance.getExecutorId());
  1492. command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
  1493. createCommand(command);
  1494. }
  1495. /**
  1496. * find last scheduler process instance in the date interval
  1497. * @param definitionId
  1498. * @param dateInterval
  1499. * @return
  1500. */
  1501. public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) {
  1502. return processInstanceMapper.queryLastSchedulerProcess(definitionId,
  1503. DateUtils.dateToString(dateInterval.getStartTime()),
  1504. DateUtils.dateToString(dateInterval.getEndTime()));
  1505. }
  1506. public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) {
  1507. return processInstanceMapper.queryLastManualProcess(definitionId,
  1508. DateUtils.dateToString(dateInterval.getStartTime()),
  1509. DateUtils.dateToString(dateInterval.getEndTime()));
  1510. }
  1511. public ProcessInstance findLastRunningProcess(int definitionId, DateInterval dateInterval) {
  1512. return processInstanceMapper.queryLastRunningProcess(definitionId,
  1513. DateUtils.dateToString(dateInterval.getStartTime()),
  1514. DateUtils.dateToString(dateInterval.getEndTime()),
  1515. stateArray);
  1516. }
  1517. /**
  1518. * query user queue by process instance id
  1519. * @param processInstanceId
  1520. * @return
  1521. */
  1522. public String queryQueueByProcessInstanceId(int processInstanceId){
  1523. return userMapper.queryQueueByProcessInstanceId(processInstanceId);
  1524. }
  1525. /**
  1526. * query worker group by id
  1527. * @param workerGroupId
  1528. * @return
  1529. */
  1530. public WorkerGroup queryWorkerGroupById(int workerGroupId){
  1531. return workerGroupMapper.queryById(workerGroupId);
  1532. }
  1533. /**
  1534. * query worker server by host
  1535. * @param host
  1536. * @return
  1537. */
  1538. public List<WorkerServer> queryWorkerServerByHost(String host){
  1539. return workerServerMapper.queryWorkerByHost(host);
  1540. }
  1541. /**
  1542. * get task worker group id
  1543. *
  1544. * @param taskInstance
  1545. * @return
  1546. */
  1547. public int getTaskWorkerGroupId(TaskInstance taskInstance) {
  1548. int taskWorkerGroupId = taskInstance.getWorkerGroupId();
  1549. ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
  1550. if(processInstance == null){
  1551. logger.error("cannot find the task:{} process instance", taskInstance.getId());
  1552. return Constants.DEFAULT_WORKER_ID;
  1553. }
  1554. int processWorkerGroupId = processInstance.getWorkerGroupId();
  1555. taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
  1556. return taskWorkerGroupId;
  1557. }
  1558. }