|
@@ -14,8 +14,11 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
+
|
|
|
package org.apache.dolphinscheduler.api.service;
|
|
|
|
|
|
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
|
|
|
+
|
|
|
import org.apache.dolphinscheduler.api.enums.Status;
|
|
|
import org.apache.dolphinscheduler.api.utils.PageInfo;
|
|
|
import org.apache.dolphinscheduler.common.Constants;
|
|
@@ -26,11 +29,17 @@ import org.apache.dolphinscheduler.dao.entity.User;
|
|
|
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
|
|
|
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import java.util.*;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* work group service
|
|
@@ -38,14 +47,11 @@ import java.util.stream.Collectors;
|
|
|
@Service
|
|
|
public class WorkerGroupService extends BaseService {
|
|
|
|
|
|
-
|
|
|
- @Autowired
|
|
|
- ProcessInstanceMapper processInstanceMapper;
|
|
|
-
|
|
|
+ private static final String NO_NODE_EXCEPTION_REGEX = "KeeperException$NoNodeException";
|
|
|
@Autowired
|
|
|
protected ZookeeperCachedOperator zookeeperCachedOperator;
|
|
|
-
|
|
|
-
|
|
|
+ @Autowired
|
|
|
+ ProcessInstanceMapper processInstanceMapper;
|
|
|
|
|
|
/**
|
|
|
* query worker group paging
|
|
@@ -56,7 +62,7 @@ public class WorkerGroupService extends BaseService {
|
|
|
* @param pageSize page size
|
|
|
* @return worker group list page
|
|
|
*/
|
|
|
- public Map<String,Object> queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) {
|
|
|
+ public Map<String, Object> queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) {
|
|
|
|
|
|
// list from index
|
|
|
Integer fromIndex = (pageNo - 1) * pageSize;
|
|
@@ -72,20 +78,20 @@ public class WorkerGroupService extends BaseService {
|
|
|
|
|
|
List<WorkerGroup> resultDataList = new ArrayList<>();
|
|
|
|
|
|
- if (CollectionUtils.isNotEmpty(workerGroups)){
|
|
|
+ if (CollectionUtils.isNotEmpty(workerGroups)) {
|
|
|
List<WorkerGroup> searchValDataList = new ArrayList<>();
|
|
|
|
|
|
- if (StringUtils.isNotEmpty(searchVal)){
|
|
|
- for (WorkerGroup workerGroup : workerGroups){
|
|
|
- if (workerGroup.getName().contains(searchVal)){
|
|
|
+ if (StringUtils.isNotEmpty(searchVal)) {
|
|
|
+ for (WorkerGroup workerGroup : workerGroups) {
|
|
|
+ if (workerGroup.getName().contains(searchVal)) {
|
|
|
searchValDataList.add(workerGroup);
|
|
|
}
|
|
|
}
|
|
|
- }else {
|
|
|
+ } else {
|
|
|
searchValDataList = workerGroups;
|
|
|
}
|
|
|
|
|
|
- if (searchValDataList.size() < pageSize){
|
|
|
+ if (searchValDataList.size() < pageSize) {
|
|
|
toIndex = (pageNo - 1) * pageSize + searchValDataList.size();
|
|
|
}
|
|
|
resultDataList = searchValDataList.subList(fromIndex, toIndex);
|
|
@@ -100,14 +106,12 @@ public class WorkerGroupService extends BaseService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
/**
|
|
|
* query all worker group
|
|
|
*
|
|
|
* @return all worker group list
|
|
|
*/
|
|
|
- public Map<String,Object> queryAllGroup() {
|
|
|
+ public Map<String, Object> queryAllGroup() {
|
|
|
Map<String, Object> result = new HashMap<>();
|
|
|
|
|
|
List<WorkerGroup> workerGroups = getWorkerGroups(false);
|
|
@@ -120,30 +124,46 @@ public class WorkerGroupService extends BaseService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
- * get worker groups
|
|
|
+ * get worker groups
|
|
|
*
|
|
|
* @param isPaging whether paging
|
|
|
* @return WorkerGroup list
|
|
|
*/
|
|
|
private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
|
|
|
- String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
|
|
|
- List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
|
|
|
+
|
|
|
+ String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
|
|
|
+ List<WorkerGroup> workerGroups = new ArrayList<>();
|
|
|
+ List<String> workerGroupList;
|
|
|
+ try {
|
|
|
+ workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) {
|
|
|
+ if (isPaging) {
|
|
|
+ return workerGroups;
|
|
|
+ } else {
|
|
|
+ //ignore noNodeException return Default
|
|
|
+ WorkerGroup wg = new WorkerGroup();
|
|
|
+ wg.setName(DEFAULT_WORKER_GROUP);
|
|
|
+ workerGroups.add(wg);
|
|
|
+ return workerGroups;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
// available workerGroup list
|
|
|
List<String> availableWorkerGroupList = new ArrayList<>();
|
|
|
|
|
|
- List<WorkerGroup> workerGroups = new ArrayList<>();
|
|
|
-
|
|
|
- for (String workerGroup : workerGroupList){
|
|
|
- String workerGroupPath= workerPath + "/" + workerGroup;
|
|
|
+ for (String workerGroup : workerGroupList) {
|
|
|
+ String workerGroupPath = workerPath + "/" + workerGroup;
|
|
|
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
|
|
|
- if (CollectionUtils.isNotEmpty(childrenNodes)){
|
|
|
+ if (CollectionUtils.isNotEmpty(childrenNodes)) {
|
|
|
availableWorkerGroupList.add(workerGroup);
|
|
|
WorkerGroup wg = new WorkerGroup();
|
|
|
wg.setName(workerGroup);
|
|
|
- if (isPaging){
|
|
|
+ if (isPaging) {
|
|
|
wg.setIpList(childrenNodes);
|
|
|
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0));
|
|
|
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
|