|
@@ -14,8 +14,20 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
+
|
|
|
package org.apache.dolphinscheduler.server.master.registry;
|
|
|
|
|
|
+import org.apache.dolphinscheduler.common.utils.DateUtils;
|
|
|
+import org.apache.dolphinscheduler.common.utils.NetUtils;
|
|
|
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
|
|
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
|
|
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
|
|
|
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
|
|
+
|
|
|
+import org.apache.curator.framework.CuratorFramework;
|
|
|
+import org.apache.curator.framework.state.ConnectionState;
|
|
|
+import org.apache.curator.framework.state.ConnectionStateListener;
|
|
|
+
|
|
|
import java.util.Date;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
@@ -23,15 +35,6 @@ import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
|
|
-import org.apache.curator.framework.CuratorFramework;
|
|
|
-import org.apache.curator.framework.state.ConnectionState;
|
|
|
-import org.apache.curator.framework.state.ConnectionStateListener;
|
|
|
-import org.apache.dolphinscheduler.common.utils.DateUtils;
|
|
|
-import org.apache.dolphinscheduler.common.utils.NetUtils;
|
|
|
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
|
|
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
|
|
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
|
|
|
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -40,7 +43,7 @@ import org.springframework.stereotype.Service;
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
|
|
/**
|
|
|
- * master registry
|
|
|
+ * master registry
|
|
|
*/
|
|
|
@Service
|
|
|
public class MasterRegistry {
|
|
@@ -48,7 +51,7 @@ public class MasterRegistry {
|
|
|
private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class);
|
|
|
|
|
|
/**
|
|
|
- * zookeeper registry center
|
|
|
+ * zookeeper registry center
|
|
|
*/
|
|
|
@Autowired
|
|
|
private ZookeeperRegistryCenter zookeeperRegistryCenter;
|
|
@@ -65,19 +68,18 @@ public class MasterRegistry {
|
|
|
private ScheduledExecutorService heartBeatExecutor;
|
|
|
|
|
|
/**
|
|
|
- * worker start time
|
|
|
+ * master start time
|
|
|
*/
|
|
|
private String startTime;
|
|
|
|
|
|
-
|
|
|
@PostConstruct
|
|
|
- public void init(){
|
|
|
+ public void init() {
|
|
|
this.startTime = DateUtils.dateToString(new Date());
|
|
|
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * registry
|
|
|
+ * registry
|
|
|
*/
|
|
|
public void registry() {
|
|
|
String address = NetUtils.getHost();
|
|
@@ -86,12 +88,12 @@ public class MasterRegistry {
|
|
|
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
|
|
|
@Override
|
|
|
public void stateChanged(CuratorFramework client, ConnectionState newState) {
|
|
|
- if(newState == ConnectionState.LOST){
|
|
|
+ if (newState == ConnectionState.LOST) {
|
|
|
logger.error("master : {} connection lost from zookeeper", address);
|
|
|
- } else if(newState == ConnectionState.RECONNECTED){
|
|
|
+ } else if (newState == ConnectionState.RECONNECTED) {
|
|
|
logger.info("master : {} reconnected to zookeeper", address);
|
|
|
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
|
|
|
- } else if(newState == ConnectionState.SUSPENDED){
|
|
|
+ } else if (newState == ConnectionState.SUSPENDED) {
|
|
|
logger.warn("master : {} connection SUSPENDED ", address);
|
|
|
}
|
|
|
}
|
|
@@ -103,36 +105,35 @@ public class MasterRegistry {
|
|
|
Sets.newHashSet(getMasterPath()),
|
|
|
zookeeperRegistryCenter);
|
|
|
|
|
|
- this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
|
|
|
- logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
|
|
|
+ this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0, masterHeartbeatInterval, TimeUnit.SECONDS);
|
|
|
+ logger.info("master node : {} registry to ZK path {} successfully with heartBeatInterval : {}s"
|
|
|
+ , address, localNodePath, masterHeartbeatInterval);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * remove registry info
|
|
|
+ * remove registry info
|
|
|
*/
|
|
|
public void unRegistry() {
|
|
|
String address = getLocalAddress();
|
|
|
String localNodePath = getMasterPath();
|
|
|
heartBeatExecutor.shutdownNow();
|
|
|
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
|
|
|
- logger.info("master node : {} unRegistry to ZK.", address);
|
|
|
+ logger.info("master node : {} unRegistry from ZK path {}."
|
|
|
+ , address, localNodePath);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * get master path
|
|
|
- * @return
|
|
|
+ * get master path
|
|
|
*/
|
|
|
private String getMasterPath() {
|
|
|
String address = getLocalAddress();
|
|
|
- String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
|
|
|
- return localNodePath;
|
|
|
+ return this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * get local address
|
|
|
- * @return
|
|
|
+ * get local address
|
|
|
*/
|
|
|
- private String getLocalAddress(){
|
|
|
+ private String getLocalAddress() {
|
|
|
|
|
|
return NetUtils.getHost() + ":" + masterConfig.getListenPort();
|
|
|
|