浏览代码

Refactor worker (#2186)

* let quartz use the same datasource

* move master/worker config from dao.properties to each config
add master/worker registry test
Tboy 5 年之前
父节点
当前提交
75efbb5402

+ 0 - 39
dolphinscheduler-dao/src/main/resources/application.properties

@@ -95,45 +95,6 @@ mybatis-plus.configuration.cache-enabled=false
 mybatis-plus.configuration.call-setters-on-nulls=true
 mybatis-plus.configuration.jdbc-type-for-null=null
 
-# master settings
-# master execute thread num
-master.exec.threads=100
-
-# master execute task number in parallel
-master.exec.task.num=20
-
-# master heartbeat interval
-master.heartbeat.interval=10
-
-# master commit task retry times
-master.task.commit.retryTimes=5
-
-# master commit task interval
-master.task.commit.interval=1000
-
-
-# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2
-master.max.cpuload.avg=100
-
-# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G.
-master.reserved.memory=0.1
-
-# worker settings
-# worker execute thread num
-worker.exec.threads=100
-
-# worker heartbeat interval
-worker.heartbeat.interval=10
-
-# submit the number of tasks at a time
-worker.fetch.task.num = 3
-
-# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
-worker.max.cpuload.avg=100
-
-# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G.
-worker.reserved.memory=0.1
-
 # data quality analysis is not currently in use. please ignore the following configuration
 # task record
 task.record.flag=false

+ 5 - 1
dolphinscheduler-server/pom.xml

@@ -128,7 +128,11 @@
 			<artifactId>mockito-core</artifactId>
 			<scope>test</scope>
 		</dependency>
-    </dependencies>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+        </dependency>
+	</dependencies>
 
 
 	<build>

+ 7 - 7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@@ -22,25 +22,25 @@ import org.springframework.stereotype.Component;
 @Component
 public class MasterConfig {
 
-    @Value("${master.exec.threads}")
+    @Value("${master.exec.threads:100}")
     private int masterExecThreads;
 
-    @Value("${master.exec.task.num}")
+    @Value("${master.exec.task.num:20}")
     private int masterExecTaskNum;
 
-    @Value("${master.heartbeat.interval}")
+    @Value("${master.heartbeat.interval:10}")
     private int masterHeartbeatInterval;
 
-    @Value("${master.task.commit.retryTimes}")
+    @Value("${master.task.commit.retryTimes:5}")
     private int masterTaskCommitRetryTimes;
 
-    @Value("${master.task.commit.interval}")
+    @Value("${master.task.commit.interval:1000}")
     private int masterTaskCommitInterval;
 
-    @Value("${master.max.cpuload.avg}")
+    @Value("${master.max.cpuload.avg:100}")
     private double masterMaxCpuloadAvg;
 
-    @Value("${master.reserved.memory}")
+    @Value("${master.reserved.memory:0.1}")
     private double masterReservedMemory;
 
     @Value("${master.host.selector:lowerWeight}")

+ 5 - 5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@@ -22,19 +22,19 @@ import org.springframework.stereotype.Component;
 @Component
 public class WorkerConfig {
 
-    @Value("${worker.exec.threads}")
+    @Value("${worker.exec.threads: 100}")
     private int workerExecThreads;
 
-    @Value("${worker.heartbeat.interval}")
+    @Value("${worker.heartbeat.interval: 10}")
     private int workerHeartbeatInterval;
 
-    @Value("${worker.fetch.task.num}")
+    @Value("${worker.fetch.task.num: 3}")
     private int workerFetchTaskNum;
 
-    @Value("${worker.max.cpuload.avg}")
+    @Value("${worker.max.cpuload.avg:100}")
     private int workerMaxCpuloadAvg;
 
-    @Value("${worker.reserved.memory}")
+    @Value("${worker.reserved.memory:0.1}")
     private double workerReservedMemory;
 
     @Value("${worker.group: default}")

+ 73 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java

@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * master registry test
+ */
+@RunWith(SpringRunner.class)
+@ContextConfiguration(classes={SpringZKServer.class, MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
+public class MasterRegistryTest {
+
+    @Autowired
+    private MasterRegistry masterRegistry;
+
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Test
+    public void testRegistry() throws InterruptedException {
+        masterRegistry.registry();
+        String masterPath = zookeeperRegistryCenter.getMasterPath();
+        Assert.assertEquals(ZookeeperRegistryCenter.MASTER_PATH, masterPath);
+        TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+        String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
+        String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
+        Assert.assertEquals(5, heartbeat.split(",").length);
+    }
+
+    @Test
+    public void testUnRegistry() throws InterruptedException {
+        masterRegistry.registry();
+        TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+        masterRegistry.unRegistry();
+        String masterPath = zookeeperRegistryCenter.getMasterPath();
+        List<String> childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath);
+        Assert.assertTrue(childrenKeys.isEmpty());
+    }
+}

+ 77 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+
+/**
+ * worker registry test
+ */
+@RunWith(SpringRunner.class)
+@ContextConfiguration(classes={SpringZKServer.class, WorkerRegistry.class,ZookeeperRegistryCenter.class, WorkerConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
+
+public class WorkerRegistryTest {
+
+    @Autowired
+    private WorkerRegistry workerRegistry;
+
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Test
+    public void testRegistry() throws InterruptedException {
+        workerRegistry.registry();
+        String workerPath = zookeeperRegistryCenter.getWorkerPath();
+        Assert.assertEquals(ZookeeperRegistryCenter.WORKER_PATH, workerPath);
+        Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim());
+        String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort());
+        TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+        String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
+        Assert.assertEquals(5, heartbeat.split(",").length);
+    }
+
+    @Test
+    public void testUnRegistry() throws InterruptedException {
+        workerRegistry.registry();
+        TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+        workerRegistry.unRegistry();
+        String workerPath = zookeeperRegistryCenter.getWorkerPath();
+        String workerGroupPath = workerPath + "/" + workerConfig.getWorkerGroup().trim();
+        List<String> childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+        Assert.assertTrue(childrenKeys.isEmpty());
+    }
+}

+ 178 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java

@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.zk;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.PriorityOrdered;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * just for test
+ */
+@Service
+public class SpringZKServer implements PriorityOrdered {
+
+    private static final Logger logger = LoggerFactory.getLogger(SpringZKServer.class);
+
+    private static volatile PublicZooKeeperServerMain zkServer = null;
+
+    public static final int DEFAULT_ZK_TEST_PORT = 2181;
+
+    public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT;
+
+    private static String dataDir = null;
+
+    private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    @PostConstruct
+    public void start() {
+        try {
+            startLocalZkServer(DEFAULT_ZK_TEST_PORT);
+        } catch (Exception e) {
+            logger.error("Failed to start ZK: " + e);
+        }
+    }
+
+    public static boolean isStarted(){
+        return isStarted.get();
+    }
+
+
+    @Override
+    public int getOrder() {
+        return PriorityOrdered.HIGHEST_PRECEDENCE;
+    }
+
+    static class PublicZooKeeperServerMain extends ZooKeeperServerMain {
+
+        @Override
+        public void initializeAndRun(String[] args)
+                throws QuorumPeerConfig.ConfigException, IOException {
+            super.initializeAndRun(args);
+        }
+
+        @Override
+        public void shutdown() {
+            super.shutdown();
+        }
+    }
+
+    /**
+     * Starts a local Zk instance with a generated empty data directory
+     *
+     * @param port The port to listen on
+     */
+    public void startLocalZkServer(final int port) {
+        startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis());
+    }
+
+    /**
+     * Starts a local Zk instance
+     *
+     * @param port        The port to listen on
+     * @param dataDirPath The path for the Zk data directory
+     */
+    private void startLocalZkServer(final int port, final String dataDirPath) {
+        if (zkServer != null) {
+            throw new RuntimeException("Zookeeper server is already started!");
+        }
+        try {
+            zkServer = new PublicZooKeeperServerMain();
+            logger.info("Zookeeper data path : {} ", dataDirPath);
+            dataDir = dataDirPath;
+            final String[] args = new String[]{Integer.toString(port), dataDirPath};
+            Thread init = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        System.setProperty("zookeeper.jmx.log4j.disable", "true");
+                        zkServer.initializeAndRun(args);
+                    } catch (QuorumPeerConfig.ConfigException e) {
+                        logger.warn("Caught exception while starting ZK", e);
+                    } catch (IOException e) {
+                        logger.warn("Caught exception while starting ZK", e);
+                    }
+                }
+            }, "init-zk-thread");
+            init.start();
+        } catch (Exception e) {
+            logger.warn("Caught exception while starting ZK", e);
+            throw new RuntimeException(e);
+        }
+
+        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
+                .connectString(DEFAULT_ZK_STR)
+                .retryPolicy(new ExponentialBackoffRetry(10,100))
+                .sessionTimeoutMs(1000 * 30)
+                .connectionTimeoutMs(1000 * 30)
+                .build();
+
+        try {
+            zkClient.blockUntilConnected(10, TimeUnit.SECONDS);
+            zkClient.close();
+        } catch (InterruptedException ignore) {
+        }
+        isStarted.compareAndSet(false, true);
+        logger.info("zk server started");
+    }
+
+    @PreDestroy
+    public void stop() {
+        try {
+            stopLocalZkServer(true);
+            logger.info("zk server stopped");
+
+        } catch (Exception e) {
+            logger.error("Failed to stop ZK ",e);
+        }
+    }
+
+    /**
+     * Stops a local Zk instance.
+     *
+     * @param deleteDataDir Whether or not to delete the data directory
+     */
+    private void stopLocalZkServer(final boolean deleteDataDir) {
+        if (zkServer != null) {
+            try {
+                zkServer.shutdown();
+                zkServer = null;
+                if (deleteDataDir) {
+                    org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
+                }
+                isStarted.compareAndSet(true, false);
+            } catch (Exception e) {
+                logger.warn("Caught exception while stopping ZK server", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

+ 0 - 100
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/StandaloneZKServerForTest.java

@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.zk;
-
-import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Properties;
-
-
-/**
- * just for test
- */
-@Ignore
-public class StandaloneZKServerForTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(StandaloneZKServerForTest.class);
-
-    private static volatile ZooKeeperServerMain zkServer = null;
-
-
-    @Before
-    public void before() {
-        logger.info("standalone zookeeper server for test service start ");
-
-        ThreadPoolExecutors.getInstance().execute(new Runnable() {
-            @Override
-            public void run() {
-
-                //delete zk data dir ?
-                File zkFile = new File(System.getProperty("java.io.tmpdir"), "zookeeper");
-
-                startStandaloneServer("2000", zkFile.getAbsolutePath(), "2181", "10", "5");
-            }
-        });
-
-    }
-
-
-    /**
-     * start zk server
-     * @param tickTime  zookeeper ticktime
-     * @param dataDir zookeeper data dir
-     * @param clientPort zookeeper client port
-     * @param initLimit zookeeper init limit
-     * @param syncLimit zookeeper sync limit
-     */
-    private void startStandaloneServer(String tickTime, String dataDir, String clientPort, String initLimit, String syncLimit) {
-        Properties props = new Properties();
-        props.setProperty("tickTime", tickTime);
-        props.setProperty("dataDir", dataDir);
-        props.setProperty("clientPort", clientPort);
-        props.setProperty("initLimit", initLimit);
-        props.setProperty("syncLimit", syncLimit);
-
-        QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
-        try {
-            quorumConfig.parseProperties(props);
-
-            if(zkServer == null ){
-
-                synchronized (StandaloneZKServerForTest.class){
-                    if(zkServer == null ){
-                        zkServer = new ZooKeeperServerMain();
-                        final ServerConfig config = new ServerConfig();
-                        config.readFrom(quorumConfig);
-                        zkServer.runFromConfig(config);
-                    }
-                }
-
-            }
-
-        } catch (Exception e) {
-            logger.error("start standalone server fail!", e);
-        }
-    }
-
-
-}