|
@@ -14,11 +14,9 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
-package queue;
|
|
|
+package org.apache.dolphinscheduler.service.zk;
|
|
|
|
|
|
-import org.apache.curator.framework.CuratorFramework;
|
|
|
-import org.apache.curator.framework.CuratorFrameworkFactory;
|
|
|
-import org.apache.curator.retry.ExponentialBackoffRetry;
|
|
|
+import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
import org.apache.zookeeper.server.ZooKeeperServerMain;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
|
|
import org.slf4j.Logger;
|
|
@@ -26,27 +24,45 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * just for test
|
|
|
+ * just speed experience version
|
|
|
+ * embedded zookeeper service
|
|
|
*/
|
|
|
public class ZKServer {
|
|
|
-
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ZKServer.class);
|
|
|
|
|
|
private static volatile PublicZooKeeperServerMain zkServer = null;
|
|
|
|
|
|
public static final int DEFAULT_ZK_TEST_PORT = 2181;
|
|
|
|
|
|
- public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT;
|
|
|
-
|
|
|
private static String dataDir = null;
|
|
|
|
|
|
private static final AtomicBoolean isStarted = new AtomicBoolean(false);
|
|
|
|
|
|
+ public static void main(String[] args) {
|
|
|
+ if(!isStarted()){
|
|
|
+ ZKServer.start();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * register hooks, which are called before the process exits
|
|
|
+ */
|
|
|
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ stop();
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }else{
|
|
|
+ logger.info("zk server aleady started");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * start service
|
|
|
+ */
|
|
|
public static void start() {
|
|
|
try {
|
|
|
startLocalZkServer(DEFAULT_ZK_TEST_PORT);
|
|
@@ -79,7 +95,8 @@ public class ZKServer {
|
|
|
* @param port The port to listen on
|
|
|
*/
|
|
|
public static void startLocalZkServer(final int port) {
|
|
|
- startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis());
|
|
|
+
|
|
|
+ startLocalZkServer(port, System.getProperty("user.dir") +"/zookeeper_data", ZooKeeperServer.DEFAULT_TICK_TIME,"20");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -87,48 +104,28 @@ public class ZKServer {
|
|
|
*
|
|
|
* @param port The port to listen on
|
|
|
* @param dataDirPath The path for the Zk data directory
|
|
|
+ * @param tickTime zk tick time
|
|
|
+ * @param maxClientCnxns zk max client connections
|
|
|
*/
|
|
|
- private static synchronized void startLocalZkServer(final int port, final String dataDirPath) {
|
|
|
+ private static synchronized void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) {
|
|
|
if (zkServer != null) {
|
|
|
throw new RuntimeException("Zookeeper server is already started!");
|
|
|
}
|
|
|
- try {
|
|
|
- zkServer = new PublicZooKeeperServerMain();
|
|
|
- logger.info("Zookeeper data path : {} ", dataDirPath);
|
|
|
- dataDir = dataDirPath;
|
|
|
- final String[] args = new String[]{Integer.toString(port), dataDirPath};
|
|
|
- Thread init = new Thread(new Runnable() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- zkServer.initializeAndRun(args);
|
|
|
- } catch (QuorumPeerConfig.ConfigException e) {
|
|
|
- logger.warn("Caught exception while starting ZK", e);
|
|
|
- } catch (IOException e) {
|
|
|
- logger.warn("Caught exception while starting ZK", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }, "init-zk-thread");
|
|
|
- init.start();
|
|
|
- } catch (Exception e) {
|
|
|
- logger.warn("Caught exception while starting ZK", e);
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
-
|
|
|
- CuratorFramework zkClient = CuratorFrameworkFactory.builder()
|
|
|
- .connectString(DEFAULT_ZK_STR)
|
|
|
- .retryPolicy(new ExponentialBackoffRetry(10,100))
|
|
|
- .sessionTimeoutMs(1000 * 30)
|
|
|
- .connectionTimeoutMs(1000 * 30)
|
|
|
- .build();
|
|
|
+ zkServer = new PublicZooKeeperServerMain();
|
|
|
+ logger.info("Zookeeper data path : {} ", dataDirPath);
|
|
|
+ dataDir = dataDirPath;
|
|
|
+ final String[] args = new String[]{Integer.toString(port), dataDirPath, Integer.toString(tickTime), maxClientCnxns};
|
|
|
|
|
|
try {
|
|
|
- zkClient.blockUntilConnected(10, TimeUnit.SECONDS);
|
|
|
- zkClient.close();
|
|
|
- } catch (InterruptedException ignore) {
|
|
|
+ logger.info("Zookeeper server started ");
|
|
|
+ isStarted.compareAndSet(false, true);
|
|
|
+
|
|
|
+ zkServer.initializeAndRun(args);
|
|
|
+ } catch (QuorumPeerConfig.ConfigException e) {
|
|
|
+ logger.warn("Caught exception while starting ZK", e);
|
|
|
+ } catch (IOException e) {
|
|
|
+ logger.warn("Caught exception while starting ZK", e);
|
|
|
}
|
|
|
- isStarted.compareAndSet(false, true);
|
|
|
- logger.info("zk server started");
|
|
|
}
|
|
|
|
|
|
/**
|