|
@@ -21,10 +21,7 @@ import cn.escheduler.api.dto.treeview.TreeViewDto;
|
|
|
import cn.escheduler.api.enums.Status;
|
|
|
import cn.escheduler.api.utils.Constants;
|
|
|
import cn.escheduler.api.utils.PageInfo;
|
|
|
-import cn.escheduler.common.enums.Flag;
|
|
|
-import cn.escheduler.common.enums.ReleaseState;
|
|
|
-import cn.escheduler.common.enums.TaskType;
|
|
|
-import cn.escheduler.common.enums.UserType;
|
|
|
+import cn.escheduler.common.enums.*;
|
|
|
import cn.escheduler.common.graph.DAG;
|
|
|
import cn.escheduler.common.model.TaskNode;
|
|
|
import cn.escheduler.common.model.TaskNodeRelation;
|
|
@@ -38,14 +35,25 @@ import cn.escheduler.dao.ProcessDao;
|
|
|
import cn.escheduler.dao.mapper.*;
|
|
|
import cn.escheduler.dao.model.*;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
+import org.apache.commons.lang3.ObjectUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.http.MediaType;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
-
|
|
|
+import org.springframework.web.multipart.MultipartFile;
|
|
|
+
|
|
|
+import javax.servlet.ServletOutputStream;
|
|
|
+import javax.servlet.http.HttpServletResponse;
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
@@ -86,6 +94,12 @@ public class ProcessDefinitionService extends BaseDAGService {
|
|
|
@Autowired
|
|
|
private ProcessDao processDao;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private DataSourceMapper dataSourceMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private WorkerGroupMapper workerGroupMapper;
|
|
|
+
|
|
|
|
|
|
* create process definition
|
|
|
*
|
|
@@ -142,7 +156,7 @@ public class ProcessDefinitionService extends BaseDAGService {
|
|
|
processDefine.setFlag(Flag.YES);
|
|
|
processDefineMapper.insert(processDefine);
|
|
|
putMsg(result, Status.SUCCESS);
|
|
|
-
|
|
|
+ result.put("processDefinitionId",processDefine.getId());
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -504,6 +518,239 @@ public class ProcessDefinitionService extends BaseDAGService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ * export process definition by id
|
|
|
+ *
|
|
|
+ * @param loginUser
|
|
|
+ * @param projectName
|
|
|
+ * @param processDefinitionId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public void exportProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId, HttpServletResponse response) {
|
|
|
+ Project project = projectMapper.queryByName(projectName);
|
|
|
+
|
|
|
+ Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
|
|
|
+ Status resultStatus = (Status) checkResult.get(Constants.STATUS);
|
|
|
+ if (resultStatus == Status.SUCCESS) {
|
|
|
+ ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);
|
|
|
+ if (processDefinition != null) {
|
|
|
+ JSONObject jsonObject = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson());
|
|
|
+ JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
|
|
|
+ for (int i = 0; i < jsonArray.size(); i++) {
|
|
|
+ JSONObject taskNode = jsonArray.getJSONObject(i);
|
|
|
+ if (taskNode.get("type") != null && taskNode.get("type") != "") {
|
|
|
+ String taskType = taskNode.getString("type");
|
|
|
+ if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
|
|
|
+ JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
|
|
|
+ DataSource dataSource = dataSourceMapper.queryById((Integer) sqlParameters.get("datasource"));
|
|
|
+ if (dataSource != null) {
|
|
|
+ sqlParameters.put("datasourceName", dataSource.getName());
|
|
|
+ }
|
|
|
+ taskNode.put("params", sqlParameters);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ jsonObject.put("tasks", jsonArray);
|
|
|
+ processDefinition.setProcessDefinitionJson(jsonObject.toString());
|
|
|
+
|
|
|
+ Map<String, Object> row = new LinkedHashMap<>();
|
|
|
+ row.put("projectName", processDefinition.getProjectName());
|
|
|
+ row.put("processDefinitionName", processDefinition.getName());
|
|
|
+ row.put("processDefinitionJson", processDefinition.getProcessDefinitionJson());
|
|
|
+ row.put("processDefinitionDesc", processDefinition.getDesc());
|
|
|
+ row.put("processDefinitionLocations", processDefinition.getLocations());
|
|
|
+ row.put("processDefinitionConnects", processDefinition.getConnects());
|
|
|
+
|
|
|
+ List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
|
|
|
+ if (schedules.size() > 0) {
|
|
|
+ Schedule schedule = schedules.get(0);
|
|
|
+ row.put("scheduleWarningType", schedule.getWarningType());
|
|
|
+ row.put("scheduleWarningGroupId", schedule.getWarningGroupId());
|
|
|
+ row.put("scheduleStartTime", schedule.getStartTime());
|
|
|
+ row.put("scheduleEndTime", schedule.getEndTime());
|
|
|
+ row.put("scheduleCrontab", schedule.getCrontab());
|
|
|
+ row.put("scheduleFailureStrategy", schedule.getFailureStrategy());
|
|
|
+ row.put("scheduleReleaseState", schedule.getReleaseState());
|
|
|
+ row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority());
|
|
|
+ if(schedule.getId() == -1){
|
|
|
+ row.put("scheduleWorkerGroupId", -1);
|
|
|
+ }else{
|
|
|
+ WorkerGroup workerGroup = workerGroupMapper.queryById(schedule.getId());
|
|
|
+ if(workerGroup != null){
|
|
|
+ row.put("scheduleWorkerGroupName", workerGroup.getName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ String rowsJson = JSONUtils.toJsonString(row);
|
|
|
+ response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
|
|
|
+ response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json");
|
|
|
+ BufferedOutputStream buff = null;
|
|
|
+ ServletOutputStream out = null;
|
|
|
+ try {
|
|
|
+ out = response.getOutputStream();
|
|
|
+ buff = new BufferedOutputStream(out);
|
|
|
+ buff.write(rowsJson.getBytes("UTF-8"));
|
|
|
+ buff.flush();
|
|
|
+ buff.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }finally {
|
|
|
+ try {
|
|
|
+ buff.close();
|
|
|
+ out.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
|
|
|
+ public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file) {
|
|
|
+ Map<String, Object> result = new HashMap<>(5);
|
|
|
+
|
|
|
+ JSONObject json = null;
|
|
|
+ try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), "UTF-8" )) {
|
|
|
+ BufferedReader streamReader = new BufferedReader(inputStreamReader);
|
|
|
+ StringBuilder respomseStrBuilder = new StringBuilder();
|
|
|
+ String inputStr = "";
|
|
|
+ while ((inputStr = streamReader.readLine())!= null){
|
|
|
+ respomseStrBuilder.append( inputStr );
|
|
|
+ }
|
|
|
+ json = JSONObject.parseObject( respomseStrBuilder.toString() );
|
|
|
+ if(json != null){
|
|
|
+ String projectName = null;
|
|
|
+ String processDefinitionName = null;
|
|
|
+ String processDefinitionJson = null;
|
|
|
+ String processDefinitionDesc = null;
|
|
|
+ String processDefinitionLocations = null;
|
|
|
+ String processDefinitionConnects = null;
|
|
|
+
|
|
|
+ String scheduleWarningType = null;
|
|
|
+ String scheduleWarningGroupId = null;
|
|
|
+ String scheduleStartTime = null;
|
|
|
+ String scheduleEndTime = null;
|
|
|
+ String scheduleCrontab = null;
|
|
|
+ String scheduleFailureStrategy = null;
|
|
|
+ String scheduleReleaseState = null;
|
|
|
+ String scheduleProcessInstancePriority = null;
|
|
|
+ String scheduleWorkerGroupId = null;
|
|
|
+ String scheduleWorkerGroupName = null;
|
|
|
+
|
|
|
+ if (ObjectUtils.allNotNull(json.get("projectName"))) {
|
|
|
+ projectName = json.get("projectName").toString();
|
|
|
+ } else {
|
|
|
+ putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("processDefinitionName"))) {
|
|
|
+ processDefinitionName = json.get("processDefinitionName").toString();
|
|
|
+ } else {
|
|
|
+ putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("processDefinitionJson"))) {
|
|
|
+ processDefinitionJson = json.get("processDefinitionJson").toString();
|
|
|
+ } else {
|
|
|
+ putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson");
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("processDefinitionDesc"))) {
|
|
|
+ processDefinitionDesc = json.get("processDefinitionDesc").toString();
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("processDefinitionLocations"))) {
|
|
|
+ processDefinitionLocations = json.get("processDefinitionLocations").toString();
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("processDefinitionConnects"))) {
|
|
|
+ processDefinitionConnects = json.get("processDefinitionConnects").toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
|
|
|
+ JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
|
|
|
+ for (int j = 0; j < jsonArray.size(); j++) {
|
|
|
+ JSONObject taskNode = jsonArray.getJSONObject(j);
|
|
|
+ JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
|
|
|
+ List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
|
|
|
+ if (dataSources.size() > 0) {
|
|
|
+ DataSource dataSource = dataSources.get(0);
|
|
|
+ sqlParameters.put("datasource", dataSource.getId());
|
|
|
+ }
|
|
|
+ taskNode.put("params", sqlParameters);
|
|
|
+ }
|
|
|
+ jsonObject.put("tasks", jsonArray);
|
|
|
+
|
|
|
+ Map<String, Object> createProcessDefinitionResult = createProcessDefinition(loginUser,projectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
|
|
|
+ Integer processDefinitionId = null;
|
|
|
+ if (ObjectUtils.allNotNull(createProcessDefinitionResult.get("processDefinitionId"))) {
|
|
|
+ processDefinitionId = Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleCrontab")) && processDefinitionId != null) {
|
|
|
+ Date now = new Date();
|
|
|
+ Schedule scheduleObj = new Schedule();
|
|
|
+ scheduleObj.setProjectName(projectName);
|
|
|
+ scheduleObj.setProcessDefinitionId(processDefinitionId);
|
|
|
+ scheduleObj.setProcessDefinitionName(processDefinitionName);
|
|
|
+ scheduleObj.setCreateTime(now);
|
|
|
+ scheduleObj.setUpdateTime(now);
|
|
|
+ scheduleObj.setUserId(loginUser.getId());
|
|
|
+ scheduleObj.setUserName(loginUser.getUserName());
|
|
|
+
|
|
|
+ scheduleCrontab = json.get("scheduleCrontab").toString();
|
|
|
+ scheduleObj.setCrontab(scheduleCrontab);
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleStartTime"))) {
|
|
|
+ scheduleStartTime = json.get("scheduleStartTime").toString();
|
|
|
+ scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime));
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleEndTime"))) {
|
|
|
+ scheduleEndTime = json.get("scheduleEndTime").toString();
|
|
|
+ scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime));
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleWarningType"))) {
|
|
|
+ scheduleWarningType = json.get("scheduleWarningType").toString();
|
|
|
+ scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType));
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleWarningGroupId"))) {
|
|
|
+ scheduleWarningGroupId = json.get("scheduleWarningGroupId").toString();
|
|
|
+ scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId));
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleFailureStrategy"))) {
|
|
|
+ scheduleFailureStrategy = json.get("scheduleFailureStrategy").toString();
|
|
|
+ scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy));
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleReleaseState"))) {
|
|
|
+ scheduleReleaseState = json.get("scheduleReleaseState").toString();
|
|
|
+ scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState));
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleProcessInstancePriority"))) {
|
|
|
+ scheduleProcessInstancePriority = json.get("scheduleProcessInstancePriority").toString();
|
|
|
+ scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority));
|
|
|
+ }
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleWorkerGroupId"))) {
|
|
|
+ scheduleWorkerGroupId = json.get("scheduleWorkerGroupId").toString();
|
|
|
+ if(scheduleWorkerGroupId != null){
|
|
|
+ scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId));
|
|
|
+ }else{
|
|
|
+ if (ObjectUtils.allNotNull(json.get("scheduleWorkerGroupName"))) {
|
|
|
+ scheduleWorkerGroupName = json.get("scheduleWorkerGroupName").toString();
|
|
|
+ List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
|
|
|
+ if(workerGroups.size() > 0){
|
|
|
+ scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ scheduleMapper.insert(scheduleObj);
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ putMsg(result, Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ putMsg(result, Status.SUCCESS);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
|
|
|
* check the process definition node meets the specifications
|