|
@@ -16,24 +16,24 @@
|
|
|
*/
|
|
|
package org.apache.dolphinscheduler.api.service;
|
|
|
|
|
|
+import com.baomidou.mybatisplus.core.metadata.IPage;
|
|
|
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
import org.apache.dolphinscheduler.api.enums.Status;
|
|
|
import org.apache.dolphinscheduler.api.utils.PageInfo;
|
|
|
import org.apache.dolphinscheduler.common.Constants;
|
|
|
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
|
|
+import org.apache.dolphinscheduler.common.utils.DateUtils;
|
|
|
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
|
|
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
|
|
+import org.apache.dolphinscheduler.dao.entity.AccessToken;
|
|
|
import org.apache.dolphinscheduler.dao.entity.User;
|
|
|
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
|
|
|
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
|
|
|
-import com.baomidou.mybatisplus.core.metadata.IPage;
|
|
|
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
* work group service
|
|
@@ -42,90 +42,13 @@ import java.util.*;
|
|
|
public class WorkerGroupService extends BaseService {
|
|
|
|
|
|
|
|
|
- @Autowired
|
|
|
- WorkerGroupMapper workerGroupMapper;
|
|
|
-
|
|
|
@Autowired
|
|
|
ProcessInstanceMapper processInstanceMapper;
|
|
|
|
|
|
@Autowired
|
|
|
protected ZookeeperCachedOperator zookeeperCachedOperator;
|
|
|
|
|
|
-
|
|
|
- * create or update a worker group
|
|
|
- *
|
|
|
- * @param loginUser login user
|
|
|
- * @param id worker group id
|
|
|
- * @param name worker group name
|
|
|
- * @param ipList ip list
|
|
|
- * @return create or update result code
|
|
|
- */
|
|
|
- public Map<String, Object> saveWorkerGroup(User loginUser,int id, String name, String ipList){
|
|
|
-
|
|
|
- Map<String, Object> result = new HashMap<>(5);
|
|
|
-
|
|
|
-
|
|
|
- if (checkAdmin(loginUser, result)){
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- if(StringUtils.isEmpty(name)){
|
|
|
- putMsg(result, Status.NAME_NULL);
|
|
|
- return result;
|
|
|
- }
|
|
|
- Date now = new Date();
|
|
|
- WorkerGroup workerGroup = null;
|
|
|
- if(id != 0){
|
|
|
- workerGroup = workerGroupMapper.selectById(id);
|
|
|
-
|
|
|
- if (workerGroup == null){
|
|
|
- workerGroup = new WorkerGroup();
|
|
|
- workerGroup.setCreateTime(now);
|
|
|
- }
|
|
|
- }else{
|
|
|
- workerGroup = new WorkerGroup();
|
|
|
- workerGroup.setCreateTime(now);
|
|
|
- }
|
|
|
- workerGroup.setName(name);
|
|
|
- workerGroup.setIpList(ipList);
|
|
|
- workerGroup.setUpdateTime(now);
|
|
|
|
|
|
- if(checkWorkerGroupNameExists(workerGroup)){
|
|
|
- putMsg(result, Status.NAME_EXIST, workerGroup.getName());
|
|
|
- return result;
|
|
|
- }
|
|
|
- if(workerGroup.getId() != 0 ){
|
|
|
- workerGroupMapper.updateById(workerGroup);
|
|
|
- }else{
|
|
|
- workerGroupMapper.insert(workerGroup);
|
|
|
- }
|
|
|
- putMsg(result, Status.SUCCESS);
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- * check worker group name exists
|
|
|
- * @param workerGroup
|
|
|
- * @return
|
|
|
- */
|
|
|
- private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
|
|
|
-
|
|
|
- List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
|
|
|
-
|
|
|
- if(CollectionUtils.isNotEmpty(workerGroupList)){
|
|
|
-
|
|
|
- if(workerGroup.getId() == 0){
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- for(WorkerGroup group : workerGroupList){
|
|
|
- if(group.getId() != workerGroup.getId()){
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
|
|
|
|
|
|
* query worker group paging
|
|
@@ -138,66 +61,100 @@ public class WorkerGroupService extends BaseService {
|
|
|
*/
|
|
|
public Map<String,Object> queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) {
|
|
|
|
|
|
+
|
|
|
+ Integer fromIndex = (pageNo - 1) * pageSize;
|
|
|
+
|
|
|
+ Integer toIndex = (pageNo - 1) * pageSize + pageSize;
|
|
|
+
|
|
|
Map<String, Object> result = new HashMap<>(5);
|
|
|
if (checkAdmin(loginUser, result)) {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- Page<WorkerGroup> page = new Page(pageNo, pageSize);
|
|
|
- IPage<WorkerGroup> workerGroupIPage = workerGroupMapper.queryListPaging(
|
|
|
- page, searchVal);
|
|
|
+ List<WorkerGroup> workerGroups = getWorkerGroups(true);
|
|
|
+
|
|
|
+ List<WorkerGroup> resultDataList = new ArrayList<>();
|
|
|
+
|
|
|
+ if (CollectionUtils.isNotEmpty(workerGroups)){
|
|
|
+ List<WorkerGroup> searchValDataList = new ArrayList<>();
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(searchVal)){
|
|
|
+ for (WorkerGroup workerGroup : workerGroups){
|
|
|
+ if (workerGroup.getName().contains(searchVal)){
|
|
|
+ searchValDataList.add(workerGroup);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ searchValDataList = workerGroups;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (searchValDataList.size() < pageSize){
|
|
|
+ toIndex = (pageNo - 1) * pageSize + searchValDataList.size();
|
|
|
+ }
|
|
|
+ resultDataList = searchValDataList.subList(fromIndex, toIndex);
|
|
|
+ }
|
|
|
+
|
|
|
PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
|
|
|
- pageInfo.setTotalCount((int)workerGroupIPage.getTotal());
|
|
|
- pageInfo.setLists(workerGroupIPage.getRecords());
|
|
|
+ pageInfo.setTotalCount(resultDataList.size());
|
|
|
+ pageInfo.setLists(resultDataList);
|
|
|
+
|
|
|
result.put(Constants.DATA_LIST, pageInfo);
|
|
|
putMsg(result, Status.SUCCESS);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
|
|
|
- * delete worker group by id
|
|
|
- * @param id worker group id
|
|
|
- * @return delete result code
|
|
|
+ * query all worker group
|
|
|
+ *
|
|
|
+ * @return all worker group list
|
|
|
*/
|
|
|
- @Transactional(rollbackFor = Exception.class)
|
|
|
- public Map<String,Object> deleteWorkerGroupById(Integer id) {
|
|
|
+ public Map<String,Object> queryAllGroup() {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
|
|
|
- Map<String, Object> result = new HashMap<>(5);
|
|
|
+ List<WorkerGroup> workerGroups = getWorkerGroups(false);
|
|
|
|
|
|
- List<ProcessInstance> processInstances = processInstanceMapper.queryByWorkerGroupIdAndStatus(id, Constants.NOT_TERMINATED_STATES);
|
|
|
- if(CollectionUtils.isNotEmpty(processInstances)){
|
|
|
- putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
|
|
|
- return result;
|
|
|
- }
|
|
|
- workerGroupMapper.deleteById(id);
|
|
|
- processInstanceMapper.updateProcessInstanceByWorkerGroupId(id, Constants.DEFAULT_WORKER_ID);
|
|
|
+ Set<String> availableWorkerGroupSet = workerGroups.stream()
|
|
|
+ .map(workerGroup -> workerGroup.getName())
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ result.put(Constants.DATA_LIST, availableWorkerGroupSet);
|
|
|
putMsg(result, Status.SUCCESS);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
|
|
|
- * query all worker group
|
|
|
+ * get worker groups
|
|
|
*
|
|
|
- * @return all worker group list
|
|
|
+ * @param isPaging whether paging
|
|
|
+ * @return WorkerGroup list
|
|
|
*/
|
|
|
- public Map<String,Object> queryAllGroup() {
|
|
|
- Map<String, Object> result = new HashMap<>();
|
|
|
+ private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
|
|
|
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
|
|
|
List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
|
|
|
|
|
|
|
|
|
List<String> availableWorkerGroupList = new ArrayList<>();
|
|
|
|
|
|
+ List<WorkerGroup> workerGroups = new ArrayList<>();
|
|
|
+
|
|
|
for (String workerGroup : workerGroupList){
|
|
|
String workerGroupPath= workerPath + "/" + workerGroup;
|
|
|
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
|
|
|
if (CollectionUtils.isNotEmpty(childrenNodes)){
|
|
|
availableWorkerGroupList.add(workerGroup);
|
|
|
+ WorkerGroup wg = new WorkerGroup();
|
|
|
+ wg.setName(workerGroup);
|
|
|
+ if (isPaging){
|
|
|
+ wg.setIpList(childrenNodes);
|
|
|
+ String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0));
|
|
|
+ wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[3]));
|
|
|
+ wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[4]));
|
|
|
+ }
|
|
|
+ workerGroups.add(wg);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- result.put(Constants.DATA_LIST, availableWorkerGroupList);
|
|
|
- putMsg(result, Status.SUCCESS);
|
|
|
- return result;
|
|
|
+ return workerGroups;
|
|
|
}
|
|
|
}
|