ProcessDefinitionService.java 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package cn.escheduler.api.service;
  18. import cn.escheduler.api.dto.treeview.Instance;
  19. import cn.escheduler.api.dto.treeview.TreeViewDto;
  20. import cn.escheduler.api.enums.Status;
  21. import cn.escheduler.api.utils.Constants;
  22. import cn.escheduler.api.utils.PageInfo;
  23. import cn.escheduler.common.enums.*;
  24. import cn.escheduler.common.graph.DAG;
  25. import cn.escheduler.common.model.TaskNode;
  26. import cn.escheduler.common.model.TaskNodeRelation;
  27. import cn.escheduler.common.process.ProcessDag;
  28. import cn.escheduler.common.process.Property;
  29. import cn.escheduler.common.thread.Stopper;
  30. import cn.escheduler.common.utils.CollectionUtils;
  31. import cn.escheduler.common.utils.DateUtils;
  32. import cn.escheduler.common.utils.JSONUtils;
  33. import cn.escheduler.dao.ProcessDao;
  34. import cn.escheduler.dao.mapper.*;
  35. import cn.escheduler.dao.model.*;
  36. import com.alibaba.fastjson.JSON;
  37. import com.alibaba.fastjson.JSONArray;
  38. import com.alibaba.fastjson.JSONObject;
  39. import com.fasterxml.jackson.core.JsonProcessingException;
  40. import org.apache.commons.lang3.ObjectUtils;
  41. import org.apache.commons.lang3.StringUtils;
  42. import org.slf4j.Logger;
  43. import org.slf4j.LoggerFactory;
  44. import org.springframework.beans.factory.annotation.Autowired;
  45. import org.springframework.http.MediaType;
  46. import org.springframework.stereotype.Service;
  47. import org.springframework.transaction.annotation.Transactional;
  48. import org.springframework.web.multipart.MultipartFile;
  49. import javax.servlet.ServletOutputStream;
  50. import javax.servlet.http.HttpServletResponse;
  51. import java.io.BufferedOutputStream;
  52. import java.io.BufferedReader;
  53. import java.io.IOException;
  54. import java.io.InputStreamReader;
  55. import java.util.*;
  56. import java.util.concurrent.ConcurrentHashMap;
  57. import static cn.escheduler.api.enums.Status.UPDATE_PROCESS_DEFINITION_ERROR;
  58. import static cn.escheduler.api.service.SchedulerService.deleteSchedule;
  59. import static cn.escheduler.api.utils.CheckUtils.checkOtherParams;
  60. import static cn.escheduler.api.utils.CheckUtils.checkTaskNodeParameters;
  61. import static cn.escheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
  62. /**
  63. * process definition service
  64. */
  65. @Service
  66. public class ProcessDefinitionService extends BaseDAGService {
  67. private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionService.class);
  68. @Autowired
  69. private ProjectMapper projectMapper;
  70. @Autowired
  71. private ProjectService projectService;
  72. @Autowired
  73. private ProcessDefinitionMapper processDefineMapper;
  74. @Autowired
  75. private ProcessInstanceMapper processInstanceMapper;
  76. @Autowired
  77. private TaskInstanceMapper taskInstanceMapper;
  78. @Autowired
  79. private ScheduleMapper scheduleMapper;
  80. @Autowired
  81. private ProcessDao processDao;
  82. @Autowired
  83. private DataSourceMapper dataSourceMapper;
  84. @Autowired
  85. private WorkerGroupMapper workerGroupMapper;
  86. /**
  87. * create process definition
  88. *
  89. * @param loginUser
  90. * @param projectName
  91. * @param name
  92. * @param processDefinitionJson
  93. * @param desc
  94. * @param locations
  95. * @param connects
  96. * @return
  97. */
  98. public Map<String, Object> createProcessDefinition(User loginUser, String projectName, String name,
  99. String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException {
  100. Map<String, Object> result = new HashMap<>(5);
  101. Project project = projectMapper.queryByName(projectName);
  102. // check project auth
  103. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  104. Status resultStatus = (Status) checkResult.get(Constants.STATUS);
  105. if (resultStatus != Status.SUCCESS) {
  106. return checkResult;
  107. }
  108. ProcessDefinition processDefine = new ProcessDefinition();
  109. Date now = new Date();
  110. ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
  111. Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
  112. if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {
  113. return checkProcessJson;
  114. }
  115. processDefine.setName(name);
  116. processDefine.setReleaseState(ReleaseState.OFFLINE);
  117. processDefine.setProjectId(project.getId());
  118. processDefine.setUserId(loginUser.getId());
  119. processDefine.setProcessDefinitionJson(processDefinitionJson);
  120. processDefine.setDesc(desc);
  121. processDefine.setLocations(locations);
  122. processDefine.setConnects(connects);
  123. processDefine.setTimeout(processData.getTimeout());
  124. processDefine.setTenantId(processData.getTenantId());
  125. //custom global params
  126. List<Property> globalParamsList = processData.getGlobalParams();
  127. if (globalParamsList != null && globalParamsList.size() > 0) {
  128. Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
  129. globalParamsList = new ArrayList<>(globalParamsSet);
  130. processDefine.setGlobalParamList(globalParamsList);
  131. }
  132. processDefine.setCreateTime(now);
  133. processDefine.setUpdateTime(now);
  134. processDefine.setFlag(Flag.YES);
  135. processDefineMapper.insert(processDefine);
  136. putMsg(result, Status.SUCCESS);
  137. result.put("processDefinitionId",processDefine.getId());
  138. return result;
  139. }
  140. /**
  141. * query proccess definition list
  142. *
  143. * @param loginUser
  144. * @param projectName
  145. * @return
  146. */
  147. public Map<String, Object> queryProccessDefinitionList(User loginUser, String projectName) {
  148. HashMap<String, Object> result = new HashMap<>(5);
  149. Project project = projectMapper.queryByName(projectName);
  150. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  151. Status resultStatus = (Status) checkResult.get(Constants.STATUS);
  152. if (resultStatus != Status.SUCCESS) {
  153. return checkResult;
  154. }
  155. List<ProcessDefinition> resourceList = processDefineMapper.queryAllDefinitionList(project.getId());
  156. result.put(Constants.DATA_LIST, resourceList);
  157. putMsg(result, Status.SUCCESS);
  158. return result;
  159. }
  160. /**
  161. * query proccess definition list paging
  162. *
  163. * @param loginUser
  164. * @param projectName
  165. * @param searchVal
  166. * @param pageNo
  167. * @param pageSize
  168. * @param userId
  169. * @return
  170. */
  171. public Map<String, Object> queryProcessDefinitionListPaging(User loginUser, String projectName, String searchVal, Integer pageNo, Integer pageSize, Integer userId) {
  172. Map<String, Object> result = new HashMap<>(5);
  173. Project project = projectMapper.queryByName(projectName);
  174. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  175. Status resultStatus = (Status) checkResult.get(Constants.STATUS);
  176. if (resultStatus != Status.SUCCESS) {
  177. return checkResult;
  178. }
  179. Integer count = processDefineMapper.countDefineNumber(project.getId(), userId, searchVal);
  180. PageInfo pageInfo = new PageInfo<ProcessData>(pageNo, pageSize);
  181. List<ProcessDefinition> resourceList = processDefineMapper.queryDefineListPaging(project.getId(),
  182. searchVal, userId, pageInfo.getStart(), pageSize);
  183. pageInfo.setTotalCount(count);
  184. pageInfo.setLists(resourceList);
  185. result.put(Constants.DATA_LIST, pageInfo);
  186. putMsg(result, Status.SUCCESS);
  187. return result;
  188. }
  189. /**
  190. * query datail of process definition
  191. *
  192. * @param loginUser
  193. * @param projectName
  194. * @param processId
  195. * @return
  196. */
  197. public Map<String, Object> queryProccessDefinitionById(User loginUser, String projectName, Integer processId) {
  198. Map<String, Object> result = new HashMap<>(5);
  199. Project project = projectMapper.queryByName(projectName);
  200. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  201. Status resultStatus = (Status) checkResult.get(Constants.STATUS);
  202. if (resultStatus != Status.SUCCESS) {
  203. return checkResult;
  204. }
  205. ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processId);
  206. if (processDefinition == null) {
  207. putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId);
  208. } else {
  209. result.put(Constants.DATA_LIST, processDefinition);
  210. putMsg(result, Status.SUCCESS);
  211. }
  212. return result;
  213. }
  214. /**
  215. * update process definition
  216. *
  217. * @param loginUser
  218. * @param projectName
  219. * @param id
  220. * @param name
  221. * @param processDefinitionJson
  222. * @param desc
  223. * @param locations
  224. * @param connects
  225. * @return
  226. */
  227. public Map<String, Object> updateProcessDefinition(User loginUser, String projectName, int id, String name,
  228. String processDefinitionJson, String desc,
  229. String locations, String connects) {
  230. Map<String, Object> result = new HashMap<>(5);
  231. Project project = projectMapper.queryByName(projectName);
  232. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  233. Status resultStatus = (Status) checkResult.get(Constants.STATUS);
  234. if (resultStatus != Status.SUCCESS) {
  235. return checkResult;
  236. }
  237. ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
  238. Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
  239. if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) {
  240. return checkProcessJson;
  241. }
  242. ProcessDefinition processDefinition = processDao.findProcessDefineById(id);
  243. if (processDefinition == null) {
  244. // check process definition exists
  245. putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id);
  246. return result;
  247. } else if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
  248. // online can not permit edit
  249. putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
  250. return result;
  251. } else {
  252. putMsg(result, Status.SUCCESS);
  253. }
  254. ProcessDefinition processDefine = processDao.findProcessDefineById(id);
  255. Date now = new Date();
  256. processDefine.setId(id);
  257. processDefine.setName(name);
  258. processDefine.setReleaseState(ReleaseState.OFFLINE);
  259. processDefine.setProjectId(project.getId());
  260. processDefine.setProcessDefinitionJson(processDefinitionJson);
  261. processDefine.setDesc(desc);
  262. processDefine.setLocations(locations);
  263. processDefine.setConnects(connects);
  264. processDefine.setTimeout(processData.getTimeout());
  265. processDefine.setTenantId(processData.getTenantId());
  266. //custom global params
  267. List<Property> globalParamsList = new ArrayList<>();
  268. if (processData.getGlobalParams() != null && processData.getGlobalParams().size() > 0) {
  269. Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
  270. globalParamsList = new ArrayList<>(userDefParamsSet);
  271. }
  272. processDefine.setGlobalParamList(globalParamsList);
  273. processDefine.setUpdateTime(now);
  274. processDefine.setFlag(Flag.YES);
  275. if (processDefineMapper.update(processDefine) > 0) {
  276. putMsg(result, Status.SUCCESS);
  277. } else {
  278. putMsg(result, UPDATE_PROCESS_DEFINITION_ERROR);
  279. }
  280. return result;
  281. }
  282. /**
  283. * verify process definition name unique
  284. *
  285. * @param loginUser
  286. * @param projectName
  287. * @param name
  288. * @return
  289. */
  290. public Map<String, Object> verifyProccessDefinitionName(User loginUser, String projectName, String name) {
  291. Map<String, Object> result = new HashMap<>();
  292. Project project = projectMapper.queryByName(projectName);
  293. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  294. Status resultEnum = (Status) checkResult.get(Constants.STATUS);
  295. if (resultEnum != Status.SUCCESS) {
  296. return checkResult;
  297. }
  298. ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), name);
  299. if (processDefinition == null) {
  300. putMsg(result, Status.SUCCESS);
  301. } else {
  302. putMsg(result, Status.PROCESS_INSTANCE_EXIST, name);
  303. }
  304. return result;
  305. }
  306. /**
  307. * delete process definition by id
  308. *
  309. * @param loginUser
  310. * @param projectName
  311. * @param processDefinitionId
  312. * @return
  313. */
  314. @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
  315. public Map<String, Object> deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) {
  316. Map<String, Object> result = new HashMap<>(5);
  317. Project project = projectMapper.queryByName(projectName);
  318. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  319. Status resultEnum = (Status) checkResult.get(Constants.STATUS);
  320. if (resultEnum != Status.SUCCESS) {
  321. return checkResult;
  322. }
  323. ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);
  324. if (processDefinition == null) {
  325. putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId);
  326. return result;
  327. }
  328. // Determine if the login user is the owner of the process definition
  329. if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
  330. putMsg(result, Status.USER_NO_OPERATION_PERM);
  331. return result;
  332. }
  333. // check process definition is already online
  334. if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
  335. putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,processDefinitionId);
  336. return result;
  337. }
  338. // get the timing according to the process definition
  339. List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
  340. if (!schedules.isEmpty() && schedules.size() > 1) {
  341. logger.warn("scheduler num is {},Greater than 1",schedules.size());
  342. putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
  343. return result;
  344. }else if(schedules.size() == 1){
  345. Schedule schedule = schedules.get(0);
  346. if(schedule.getReleaseState() == ReleaseState.OFFLINE){
  347. scheduleMapper.delete(schedule.getId());
  348. }else if(schedule.getReleaseState() == ReleaseState.ONLINE){
  349. putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE,schedule.getId());
  350. return result;
  351. }
  352. }
  353. int delete = processDefineMapper.delete(processDefinitionId);
  354. if (delete > 0) {
  355. putMsg(result, Status.SUCCESS);
  356. } else {
  357. putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
  358. }
  359. return result;
  360. }
  361. /**
  362. * batch delete process definition by ids
  363. *
  364. * @param loginUser
  365. * @param projectName
  366. * @param processDefinitionIds
  367. * @return
  368. */
  369. public Map<String, Object> batchDeleteProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds) {
  370. Map<String, Object> result = new HashMap<>(5);
  371. Map<String, Object> deleteReuslt = new HashMap<>(5);
  372. List<Integer> deleteFailedIdList = new ArrayList<Integer>();
  373. Project project = projectMapper.queryByName(projectName);
  374. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  375. Status resultEnum = (Status) checkResult.get(Constants.STATUS);
  376. if (resultEnum != Status.SUCCESS) {
  377. return checkResult;
  378. }
  379. if(StringUtils.isNotEmpty(processDefinitionIds)){
  380. String[] processInstanceIdArray = processDefinitionIds.split(",");
  381. for (String strProcessInstanceId:processInstanceIdArray) {
  382. int processInstanceId = Integer.parseInt(strProcessInstanceId);
  383. try {
  384. deleteReuslt = deleteProcessDefinitionById(loginUser, projectName, processInstanceId);
  385. if(!Status.SUCCESS.equals(deleteReuslt.get(Constants.STATUS))){
  386. deleteFailedIdList.add(processInstanceId);
  387. logger.error((String)deleteReuslt.get(Constants.MSG));
  388. }
  389. } catch (Exception e) {
  390. deleteFailedIdList.add(processInstanceId);
  391. }
  392. }
  393. }
  394. if(deleteFailedIdList.size() > 0){
  395. putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),","));
  396. }else{
  397. putMsg(result, Status.SUCCESS);
  398. }
  399. return result;
  400. }
  401. /**
  402. * release process definition: online / offline
  403. *
  404. * @param loginUser
  405. * @param projectName
  406. * @param id
  407. * @param releaseState
  408. * @return
  409. */
  410. @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
  411. public Map<String, Object> releaseProcessDefinition(User loginUser, String projectName, int id, int releaseState) {
  412. HashMap<String, Object> result = new HashMap<>();
  413. Project project = projectMapper.queryByName(projectName);
  414. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  415. Status resultEnum = (Status) checkResult.get(Constants.STATUS);
  416. if (resultEnum != Status.SUCCESS) {
  417. return checkResult;
  418. }
  419. ReleaseState state = ReleaseState.getEnum(releaseState);
  420. switch (state) {
  421. case ONLINE: {
  422. processDefineMapper.updateProcessDefinitionReleaseState(id, state);
  423. break;
  424. }
  425. case OFFLINE: {
  426. processDefineMapper.updateProcessDefinitionReleaseState(id, state);
  427. List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray(new int[]{id});
  428. for(Schedule schedule:scheduleList){
  429. logger.info("set schedule offline, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id);
  430. // set status
  431. schedule.setReleaseState(ReleaseState.OFFLINE);
  432. scheduleMapper.update(schedule);
  433. deleteSchedule(project.getId(), schedule.getId());
  434. }
  435. break;
  436. }
  437. default: {
  438. putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState");
  439. return result;
  440. }
  441. }
  442. putMsg(result, Status.SUCCESS);
  443. return result;
  444. }
  445. /**
  446. * export process definition by id
  447. *
  448. * @param loginUser
  449. * @param projectName
  450. * @param processDefinitionId
  451. * @return
  452. */
  453. public void exportProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId, HttpServletResponse response) {
  454. Project project = projectMapper.queryByName(projectName);
  455. Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
  456. Status resultStatus = (Status) checkResult.get(Constants.STATUS);
  457. if (resultStatus == Status.SUCCESS) {
  458. ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);
  459. if (processDefinition != null) {
  460. JSONObject jsonObject = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson());
  461. JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
  462. for (int i = 0; i < jsonArray.size(); i++) {
  463. JSONObject taskNode = jsonArray.getJSONObject(i);
  464. if (taskNode.get("type") != null && taskNode.get("type") != "") {
  465. String taskType = taskNode.getString("type");
  466. if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
  467. JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
  468. DataSource dataSource = dataSourceMapper.queryById((Integer) sqlParameters.get("datasource"));
  469. if (dataSource != null) {
  470. sqlParameters.put("datasourceName", dataSource.getName());
  471. }
  472. taskNode.put("params", sqlParameters);
  473. }
  474. }
  475. }
  476. jsonObject.put("tasks", jsonArray);
  477. processDefinition.setProcessDefinitionJson(jsonObject.toString());
  478. Map<String, Object> row = new LinkedHashMap<>();
  479. row.put("projectName", processDefinition.getProjectName());
  480. row.put("processDefinitionName", processDefinition.getName());
  481. row.put("processDefinitionJson", processDefinition.getProcessDefinitionJson());
  482. row.put("processDefinitionDesc", processDefinition.getDesc());
  483. row.put("processDefinitionLocations", processDefinition.getLocations());
  484. row.put("processDefinitionConnects", processDefinition.getConnects());
  485. List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
  486. if (schedules.size() > 0) {
  487. Schedule schedule = schedules.get(0);
  488. row.put("scheduleWarningType", schedule.getWarningType());
  489. row.put("scheduleWarningGroupId", schedule.getWarningGroupId());
  490. row.put("scheduleStartTime", schedule.getStartTime());
  491. row.put("scheduleEndTime", schedule.getEndTime());
  492. row.put("scheduleCrontab", schedule.getCrontab());
  493. row.put("scheduleFailureStrategy", schedule.getFailureStrategy());
  494. row.put("scheduleReleaseState", schedule.getReleaseState());
  495. row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority());
  496. if(schedule.getId() == -1){
  497. row.put("scheduleWorkerGroupId", -1);
  498. }else{
  499. WorkerGroup workerGroup = workerGroupMapper.queryById(schedule.getId());
  500. if(workerGroup != null){
  501. row.put("scheduleWorkerGroupName", workerGroup.getName());
  502. }
  503. }
  504. }
  505. String rowsJson = JSONUtils.toJsonString(row);
  506. response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
  507. response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json");
  508. BufferedOutputStream buff = null;
  509. ServletOutputStream out = null;
  510. try {
  511. out = response.getOutputStream();
  512. buff = new BufferedOutputStream(out);
  513. buff.write(rowsJson.getBytes("UTF-8"));
  514. buff.flush();
  515. buff.close();
  516. } catch (IOException e) {
  517. e.printStackTrace();
  518. }finally {
  519. try {
  520. buff.close();
  521. out.close();
  522. } catch (Exception e) {
  523. e.printStackTrace();
  524. }
  525. }
  526. }
  527. }
  528. }
  529. @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
  530. public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file) {
  531. Map<String, Object> result = new HashMap<>(5);
  532. JSONObject json = null;
  533. try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), "UTF-8" )) {
  534. BufferedReader streamReader = new BufferedReader(inputStreamReader);
  535. StringBuilder respomseStrBuilder = new StringBuilder();
  536. String inputStr = "";
  537. while ((inputStr = streamReader.readLine())!= null){
  538. respomseStrBuilder.append( inputStr );
  539. }
  540. json = JSONObject.parseObject( respomseStrBuilder.toString() );
  541. if(json != null){
  542. String projectName = null;
  543. String processDefinitionName = null;
  544. String processDefinitionJson = null;
  545. String processDefinitionDesc = null;
  546. String processDefinitionLocations = null;
  547. String processDefinitionConnects = null;
  548. String scheduleWarningType = null;
  549. String scheduleWarningGroupId = null;
  550. String scheduleStartTime = null;
  551. String scheduleEndTime = null;
  552. String scheduleCrontab = null;
  553. String scheduleFailureStrategy = null;
  554. String scheduleReleaseState = null;
  555. String scheduleProcessInstancePriority = null;
  556. String scheduleWorkerGroupId = null;
  557. String scheduleWorkerGroupName = null;
  558. if (ObjectUtils.allNotNull(json.get("projectName"))) {
  559. projectName = json.get("projectName").toString();
  560. } else {
  561. putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
  562. }
  563. if (ObjectUtils.allNotNull(json.get("processDefinitionName"))) {
  564. processDefinitionName = json.get("processDefinitionName").toString();
  565. } else {
  566. putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
  567. }
  568. if (ObjectUtils.allNotNull(json.get("processDefinitionJson"))) {
  569. processDefinitionJson = json.get("processDefinitionJson").toString();
  570. } else {
  571. putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson");
  572. }
  573. if (ObjectUtils.allNotNull(json.get("processDefinitionDesc"))) {
  574. processDefinitionDesc = json.get("processDefinitionDesc").toString();
  575. }
  576. if (ObjectUtils.allNotNull(json.get("processDefinitionLocations"))) {
  577. processDefinitionLocations = json.get("processDefinitionLocations").toString();
  578. }
  579. if (ObjectUtils.allNotNull(json.get("processDefinitionConnects"))) {
  580. processDefinitionConnects = json.get("processDefinitionConnects").toString();
  581. }
  582. JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
  583. JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
  584. for (int j = 0; j < jsonArray.size(); j++) {
  585. JSONObject taskNode = jsonArray.getJSONObject(j);
  586. JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
  587. List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
  588. if (dataSources.size() > 0) {
  589. DataSource dataSource = dataSources.get(0);
  590. sqlParameters.put("datasource", dataSource.getId());
  591. }
  592. taskNode.put("params", sqlParameters);
  593. }
  594. jsonObject.put("tasks", jsonArray);
  595. Map<String, Object> createProcessDefinitionResult = createProcessDefinition(loginUser,projectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
  596. Integer processDefinitionId = null;
  597. if (ObjectUtils.allNotNull(createProcessDefinitionResult.get("processDefinitionId"))) {
  598. processDefinitionId = Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
  599. }
  600. if (ObjectUtils.allNotNull(json.get("scheduleCrontab")) && processDefinitionId != null) {
  601. Date now = new Date();
  602. Schedule scheduleObj = new Schedule();
  603. scheduleObj.setProjectName(projectName);
  604. scheduleObj.setProcessDefinitionId(processDefinitionId);
  605. scheduleObj.setProcessDefinitionName(processDefinitionName);
  606. scheduleObj.setCreateTime(now);
  607. scheduleObj.setUpdateTime(now);
  608. scheduleObj.setUserId(loginUser.getId());
  609. scheduleObj.setUserName(loginUser.getUserName());
  610. scheduleCrontab = json.get("scheduleCrontab").toString();
  611. scheduleObj.setCrontab(scheduleCrontab);
  612. if (ObjectUtils.allNotNull(json.get("scheduleStartTime"))) {
  613. scheduleStartTime = json.get("scheduleStartTime").toString();
  614. scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime));
  615. }
  616. if (ObjectUtils.allNotNull(json.get("scheduleEndTime"))) {
  617. scheduleEndTime = json.get("scheduleEndTime").toString();
  618. scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime));
  619. }
  620. if (ObjectUtils.allNotNull(json.get("scheduleWarningType"))) {
  621. scheduleWarningType = json.get("scheduleWarningType").toString();
  622. scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType));
  623. }
  624. if (ObjectUtils.allNotNull(json.get("scheduleWarningGroupId"))) {
  625. scheduleWarningGroupId = json.get("scheduleWarningGroupId").toString();
  626. scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId));
  627. }
  628. if (ObjectUtils.allNotNull(json.get("scheduleFailureStrategy"))) {
  629. scheduleFailureStrategy = json.get("scheduleFailureStrategy").toString();
  630. scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy));
  631. }
  632. if (ObjectUtils.allNotNull(json.get("scheduleReleaseState"))) {
  633. scheduleReleaseState = json.get("scheduleReleaseState").toString();
  634. scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState));
  635. }
  636. if (ObjectUtils.allNotNull(json.get("scheduleProcessInstancePriority"))) {
  637. scheduleProcessInstancePriority = json.get("scheduleProcessInstancePriority").toString();
  638. scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority));
  639. }
  640. if (ObjectUtils.allNotNull(json.get("scheduleWorkerGroupId"))) {
  641. scheduleWorkerGroupId = json.get("scheduleWorkerGroupId").toString();
  642. if(scheduleWorkerGroupId != null){
  643. scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId));
  644. }else{
  645. if (ObjectUtils.allNotNull(json.get("scheduleWorkerGroupName"))) {
  646. scheduleWorkerGroupName = json.get("scheduleWorkerGroupName").toString();
  647. List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
  648. if(workerGroups.size() > 0){
  649. scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
  650. }
  651. }
  652. }
  653. }
  654. scheduleMapper.insert(scheduleObj);
  655. }
  656. }else{
  657. putMsg(result, Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR);
  658. return result;
  659. }
  660. } catch (IOException e) {
  661. throw new RuntimeException(e.getMessage(), e);
  662. }
  663. putMsg(result, Status.SUCCESS);
  664. return result;
  665. }
  666. /**
  667. * check the process definition node meets the specifications
  668. *
  669. * @param processData
  670. * @param processDefinitionJson
  671. * @return
  672. */
  673. public Map<String, Object> checkProcessNodeList(ProcessData processData, String processDefinitionJson) {
  674. Map<String, Object> result = new HashMap<>(5);
  675. try {
  676. if (processData == null) {
  677. logger.error("process data is null");
  678. putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson);
  679. return result;
  680. }
  681. // Check whether the task node is normal
  682. List<TaskNode> taskNodes = processData.getTasks();
  683. if (taskNodes == null) {
  684. logger.error("process node info is empty");
  685. putMsg(result, Status.DATA_IS_NULL, processDefinitionJson);
  686. return result;
  687. }
  688. // check has cycle
  689. if (graphHasCycle(taskNodes)) {
  690. logger.error("process DAG has cycle");
  691. putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
  692. return result;
  693. }
  694. // check whether the process definition json is normal
  695. for (TaskNode taskNode : taskNodes) {
  696. if (!checkTaskNodeParameters(taskNode.getParams(), taskNode.getType())) {
  697. logger.error("task node {} parameter invalid", taskNode.getName());
  698. putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
  699. return result;
  700. }
  701. // check extra params
  702. checkOtherParams(taskNode.getExtras());
  703. }
  704. putMsg(result,Status.SUCCESS);
  705. } catch (Exception e) {
  706. result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
  707. result.put(Constants.MSG, e.getMessage());
  708. }
  709. return result;
  710. }
  711. /**
  712. * get task node details based on process definition
  713. */
  714. public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) throws Exception {
  715. Map<String, Object> result = new HashMap<>();
  716. ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(defineId);
  717. if (processDefinition == null) {
  718. logger.info("process define not exists");
  719. putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition.getId());
  720. return result;
  721. }
  722. String processDefinitionJson = processDefinition.getProcessDefinitionJson();
  723. ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
  724. List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
  725. result.put(Constants.DATA_LIST, taskNodeList);
  726. putMsg(result, Status.SUCCESS);
  727. return result;
  728. }
  729. /**
  730. * get task node details based on process definition
  731. */
  732. public Map<String, Object> getTaskNodeListByDefinitionIdList(String defineIdList) throws Exception {
  733. Map<String, Object> result = new HashMap<>();
  734. Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>();
  735. String[] idList = defineIdList.split(",");
  736. List<String> definitionIdList = Arrays.asList(idList);
  737. List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(definitionIdList);
  738. if (processDefinitionList == null || processDefinitionList.size() ==0) {
  739. logger.info("process definition not exists");
  740. putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList);
  741. return result;
  742. }
  743. for(ProcessDefinition processDefinition : processDefinitionList){
  744. String processDefinitionJson = processDefinition.getProcessDefinitionJson();
  745. ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
  746. List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
  747. taskNodeMap.put(processDefinition.getId(), taskNodeList);
  748. }
  749. result.put(Constants.DATA_LIST, taskNodeMap);
  750. putMsg(result, Status.SUCCESS);
  751. return result;
  752. }
  753. /**
  754. * Encapsulates the TreeView structure
  755. *
  756. * @param processId
  757. * @param limit
  758. * @return
  759. */
  760. public Map<String, Object> viewTree(Integer processId, Integer limit) throws Exception {
  761. Map<String, Object> result = new HashMap<>();
  762. ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processId);
  763. if (processDefinition == null) {
  764. logger.info("process define not exists");
  765. throw new RuntimeException("process define not exists");
  766. }
  767. DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
  768. /**
  769. * nodes that is running
  770. */
  771. Map<String, List<TreeViewDto>> runningNodeMap = new ConcurrentHashMap<>();
  772. /**
  773. * nodes that is waiting torun
  774. */
  775. Map<String, List<TreeViewDto>> waitingRunningNodeMap = new ConcurrentHashMap<>();
  776. /**
  777. * List of process instances
  778. */
  779. List<ProcessInstance> processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit);
  780. if (limit > processInstanceList.size()) {
  781. limit = processInstanceList.size();
  782. }
  783. TreeViewDto parentTreeViewDto = new TreeViewDto();
  784. parentTreeViewDto.setName("DAG");
  785. parentTreeViewDto.setType("");
  786. // Specify the process definition, because it is a TreeView for a process definition
  787. for (int i = limit - 1; i >= 0; i--) {
  788. ProcessInstance processInstance = processInstanceList.get(i);
  789. Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime();
  790. parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", processInstance.getState().toString()
  791. , processInstance.getStartTime(), endTime, processInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
  792. }
  793. List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
  794. parentTreeViewDtoList.add(parentTreeViewDto);
  795. // Here is the encapsulation task instance
  796. for (String startNode : dag.getBeginNode()) {
  797. runningNodeMap.put(startNode, parentTreeViewDtoList);
  798. }
  799. while (Stopper.isRunning()) {
  800. Set<String> postNodeList = null;
  801. Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
  802. while (iter.hasNext()) {
  803. Map.Entry<String, List<TreeViewDto>> en = iter.next();
  804. String nodeName = en.getKey();
  805. parentTreeViewDtoList = en.getValue();
  806. TreeViewDto treeViewDto = new TreeViewDto();
  807. treeViewDto.setName(nodeName);
  808. TaskNode taskNode = dag.getNode(nodeName);
  809. treeViewDto.setType(taskNode.getType());
  810. //set treeViewDto instances
  811. for (int i = limit - 1; i >= 0; i--) {
  812. ProcessInstance processInstance = processInstanceList.get(i);
  813. TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName);
  814. if (taskInstance == null) {
  815. treeViewDto.getInstances().add(new Instance(-1, "not running", "null"));
  816. } else {
  817. Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
  818. Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
  819. int subProcessId = 0;
  820. /**
  821. * if process is sub process, the return sub id, or sub id=0
  822. */
  823. if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) {
  824. String taskJson = taskInstance.getTaskJson();
  825. taskNode = JSON.parseObject(taskJson, TaskNode.class);
  826. subProcessId = Integer.parseInt(JSON.parseObject(
  827. taskNode.getParams()).getString(CMDPARAM_SUB_PROCESS_DEFINE_ID));
  828. }
  829. treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString()
  830. , taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId));
  831. }
  832. }
  833. for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
  834. pTreeViewDto.getChildren().add(treeViewDto);
  835. }
  836. postNodeList = dag.getSubsequentNodes(nodeName);
  837. if (postNodeList != null && postNodeList.size() > 0) {
  838. for (String nextNodeName : postNodeList) {
  839. List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName);
  840. if (treeViewDtoList != null && treeViewDtoList.size() > 0) {
  841. treeViewDtoList.add(treeViewDto);
  842. waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
  843. } else {
  844. treeViewDtoList = new ArrayList<>();
  845. treeViewDtoList.add(treeViewDto);
  846. waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
  847. }
  848. }
  849. }
  850. runningNodeMap.remove(nodeName);
  851. }
  852. if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) {
  853. break;
  854. } else {
  855. runningNodeMap.putAll(waitingRunningNodeMap);
  856. waitingRunningNodeMap.clear();
  857. }
  858. }
  859. result.put(Constants.DATA_LIST, parentTreeViewDto);
  860. result.put(Constants.STATUS, Status.SUCCESS);
  861. result.put(Constants.MSG, Status.SUCCESS.getMsg());
  862. return result;
  863. }
  864. /**
  865. * Generate the DAG Graph based on the process definition id
  866. *
  867. * @param processDefinition
  868. * @return
  869. * @throws Exception
  870. */
  871. private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {
  872. String processDefinitionJson = processDefinition.getProcessDefinitionJson();
  873. ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
  874. List<TaskNode> taskNodeList = processData.getTasks();
  875. processDefinition.setGlobalParamList(processData.getGlobalParams());
  876. List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
  877. // Traverse node information and build relationships
  878. for (TaskNode taskNode : taskNodeList) {
  879. String preTasks = taskNode.getPreTasks();
  880. List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
  881. // If the dependency is not empty
  882. if (preTasksList != null) {
  883. for (String depNode : preTasksList) {
  884. taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
  885. }
  886. }
  887. }
  888. ProcessDag processDag = new ProcessDag();
  889. processDag.setEdges(taskNodeRelations);
  890. processDag.setNodes(taskNodeList);
  891. // Generate concrete Dag to be executed
  892. return genDagGraph(processDag);
  893. }
  894. /**
  895. * Generate the DAG of process
  896. *
  897. * @return DAG
  898. */
  899. private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDag processDag) {
  900. DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();
  901. /**
  902. * Add the ndoes
  903. */
  904. if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
  905. for (TaskNode node : processDag.getNodes()) {
  906. dag.addNode(node.getName(), node);
  907. }
  908. }
  909. /**
  910. * Add the edges
  911. */
  912. if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
  913. for (TaskNodeRelation edge : processDag.getEdges()) {
  914. dag.addEdge(edge.getStartNode(), edge.getEndNode());
  915. }
  916. }
  917. return dag;
  918. }
  919. /**
  920. * whether the graph has a ring
  921. *
  922. * @param taskNodeResponseList
  923. * @return
  924. */
  925. private boolean graphHasCycle(List<TaskNode> taskNodeResponseList) {
  926. DAG<String, TaskNode, String> graph = new DAG<>();
  927. // Fill the vertices
  928. for (TaskNode taskNodeResponse : taskNodeResponseList) {
  929. graph.addNode(taskNodeResponse.getName(), taskNodeResponse);
  930. }
  931. // Fill edge relations
  932. for (TaskNode taskNodeResponse : taskNodeResponseList) {
  933. taskNodeResponse.getPreTasks();
  934. List<String> preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(),String.class);
  935. if (CollectionUtils.isNotEmpty(preTasks)) {
  936. for (String preTask : preTasks) {
  937. if (!graph.addEdge(preTask, taskNodeResponse.getName())) {
  938. return true;
  939. }
  940. }
  941. }
  942. }
  943. return graph.hasCycle();
  944. }
  945. }