Browse Source

[Improvement][UT] Improve Worker registry coverage (#15380)

Co-authored-by: fuchanghai <changhaifu@apache.org>
Co-authored-by: Eric Gao <ericgao.apache@gmail.com>
Co-authored-by: Rick Cheng <rickchengx@gmail.com>
(cherry picked from commit 69676b445c6a87dbd97c8bfd5797a26600e1bef6)
John Huang 1 year ago
parent
commit
8e9537ad33

+ 0 - 3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java

@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.registry;
 
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.StrategyType;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -34,8 +33,6 @@ public class WorkerStopStrategy implements WorkerConnectStrategy {
 
     @Autowired
     public RegistryClient registryClient;
-    @Autowired
-    private WorkerConfig workerConfig;
 
     @Override
     public void disconnect() {

+ 11 - 0
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java

@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThread
 
 import java.time.Duration;
 
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -52,6 +53,16 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
     @Autowired
     private WorkerTaskExecutorThreadPool workerManagerThread;
 
+    public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig,
+                                 @NonNull RegistryClient registryClient,
+                                 @NonNull MessageRetryRunner messageRetryRunner,
+                                 @NonNull WorkerTaskExecutorThreadPool workerManagerThread) {
+        this.workerConfig = workerConfig;
+        this.registryClient = registryClient;
+        this.messageRetryRunner = messageRetryRunner;
+        this.workerManagerThread = workerManagerThread;
+    }
+
     @Override
     public void disconnect() {
         try {

+ 60 - 0
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import static org.mockito.Mockito.times;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * worker registry test
+ */
+@ExtendWith(MockitoExtension.class)
+public class WorkerConnectionStateListenerTest {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class);
+    @InjectMocks
+    private WorkerConnectionStateListener workerConnectionStateListener;
+    @Mock
+    private WorkerConfig workerConfig;
+    @Mock
+    private WorkerConnectStrategy workerConnectStrategy;
+
+    @Test
+    public void testWorkerConnectionStateListener() {
+        workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED);
+
+        workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED);
+        Mockito.verify(workerConnectStrategy, times(1)).reconnect();
+
+        workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED);
+
+        workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED);
+        Mockito.verify(workerConnectStrategy, times(1)).disconnect();
+    }
+}

+ 26 - 14
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.registry;
 
 import static org.mockito.BDDMockito.given;
 
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
 import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@@ -29,6 +31,9 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtecti
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Optional;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -37,6 +42,8 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * worker registry test
@@ -44,26 +51,24 @@ import org.mockito.junit.jupiter.MockitoExtension;
 @ExtendWith(MockitoExtension.class)
 public class WorkerRegistryClientTest {
 
+    private static final Logger log = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
     @InjectMocks
     private WorkerRegistryClient workerRegistryClient;
-
     @Mock
     private RegistryClient registryClient;
-
     @Mock
     private WorkerConfig workerConfig;
-
     @Mock
     private MetricsProvider metricsProvider;
-
     @Mock
     private WorkerTaskExecutorThreadPool workerManagerThread;
-
     @Mock
     private WorkerConnectStrategy workerConnectStrategy;
+    @Mock
+    private IStoppable stoppable;
 
     @Test
-    public void testStart() {
+    public void testWorkerRegistryClientbasic() {
 
         given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
         given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
@@ -75,16 +80,23 @@ public class WorkerRegistryClientTest {
         workerRegistryClient.initWorkRegistry();
         workerRegistryClient.start();
 
-        Assertions.assertTrue(true);
+        workerRegistryClient.setRegistryStoppable(stoppable);
     }
 
     @Test
-    public void testUnRegistry() {
-
-    }
-
-    @Test
-    public void testGetWorkerZkPaths() {
-
+    public void testWorkerRegistryClientgetAlertServerAddress() {
+        given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
+                .willReturn(new ArrayList<Server>());
+        Assertions.assertEquals(workerRegistryClient.getAlertServerAddress(), Optional.empty());
+        Mockito.reset(registryClient);
+        String host = "test";
+        Integer port = 1;
+        Server server = new Server();
+        server.setHost(host);
+        server.setPort(port);
+        given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
+                .willReturn(new ArrayList<Server>(Arrays.asList(server)));
+        Assertions.assertEquals(workerRegistryClient.getAlertServerAddress().get().getAddress(),
+                String.format("%s:%d", host, port));
     }
 }

+ 187 - 0
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java

@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.doNothing;
+
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
+
+import java.time.Duration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * worker registry test
+ */
+@ExtendWith(MockitoExtension.class)
+public class WorkerStrategyTest {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerStrategyTest.class);
+    @Mock
+    private RegistryClient registryClient;
+    @Mock
+    private IStoppable stoppable;
+    @Mock
+    private WorkerConfig workerConfig;
+    @Mock
+    private WorkerRpcServer workerRpcServer;
+    @Mock
+    private MessageRetryRunner messageRetryRunner;
+    @Mock
+    private WorkerTaskExecutorThreadPool workerManagerThread;
+    @Mock
+    private ConnectStrategyProperties connectStrategyProperties;
+
+    @Test
+    public void testWorkerStopStrategy() {
+        given(registryClient.getStoppable())
+                .willReturn(stoppable);
+        WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy();
+        workerStopStrategy.registryClient = registryClient;
+        workerStopStrategy.reconnect();
+        workerStopStrategy.disconnect();
+        Assertions.assertEquals(workerStopStrategy.getStrategyType(), StrategyType.STOP);
+    }
+
+    @Test
+    public void testWorkerWaitingStrategyreconnect() {
+        WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
+                workerConfig,
+                registryClient,
+                messageRetryRunner,
+                workerManagerThread);
+        Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
+
+        try (
+                MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
+                        Mockito.mockStatic(ServerLifeCycleManager.class)) {
+            serverLifeCycleManagerMockedStatic
+                    .when(() -> ServerLifeCycleManager.isRunning())
+                    .thenReturn(true);
+            workerWaitingStrategy.reconnect();
+
+        }
+
+        try (
+                MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
+                        Mockito.mockStatic(ServerLifeCycleManager.class)) {
+            doNothing().when(stoppable).stop(anyString());
+            given(registryClient.getStoppable())
+                    .willReturn(stoppable);
+            serverLifeCycleManagerMockedStatic
+                    .when(() -> ServerLifeCycleManager.recoverFromWaiting())
+                    .thenThrow(new ServerLifeCycleException(""));
+            workerWaitingStrategy.reconnect();
+        }
+
+        try (
+                MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
+                        Mockito.mockStatic(ServerLifeCycleManager.class)) {
+            serverLifeCycleManagerMockedStatic
+                    .when(() -> ServerLifeCycleManager.recoverFromWaiting())
+                    .thenAnswer(invocation -> null);
+            workerWaitingStrategy.reconnect();
+        }
+    }
+
+    @Test
+    public void testWorkerWaitingStrategydisconnect() {
+        WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
+                workerConfig,
+                registryClient,
+                messageRetryRunner,
+                workerManagerThread);
+        Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
+
+        try (
+                MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
+                        Mockito.mockStatic(ServerLifeCycleManager.class)) {
+            doNothing().when(stoppable).stop(anyString());
+            given(registryClient.getStoppable())
+                    .willReturn(stoppable);
+            serverLifeCycleManagerMockedStatic
+                    .when(() -> ServerLifeCycleManager.toWaiting())
+                    .thenThrow(new ServerLifeCycleException(""));
+            workerWaitingStrategy.disconnect();
+        }
+
+        try (
+                MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
+                        Mockito.mockStatic(ServerLifeCycleManager.class)) {
+            given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
+            given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
+            Mockito.reset(registryClient);
+            doNothing().when(registryClient).connectUntilTimeout(any());
+            serverLifeCycleManagerMockedStatic
+                    .when(() -> ServerLifeCycleManager.toWaiting())
+                    .thenAnswer(invocation -> null);
+            workerWaitingStrategy.disconnect();
+        }
+
+        try (
+                MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
+                        Mockito.mockStatic(ServerLifeCycleManager.class)) {
+            given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
+            given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
+            Mockito.reset(registryClient);
+            doNothing().when(stoppable).stop(anyString());
+            given(registryClient.getStoppable())
+                    .willReturn(stoppable);
+            Mockito.doThrow(new RegistryException("TEST")).when(registryClient).connectUntilTimeout(any());
+            serverLifeCycleManagerMockedStatic
+                    .when(() -> ServerLifeCycleManager.toWaiting())
+                    .thenAnswer(invocation -> null);
+            workerWaitingStrategy.disconnect();
+        }
+
+        try (
+                MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
+                        Mockito.mockStatic(ServerLifeCycleManager.class)) {
+            Mockito.reset(workerConfig);
+            given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new NullPointerException(""));
+            doNothing().when(stoppable).stop(anyString());
+            given(registryClient.getStoppable())
+                    .willReturn(stoppable);
+            serverLifeCycleManagerMockedStatic
+                    .when(() -> ServerLifeCycleManager.toWaiting())
+                    .thenAnswer(invocation -> null);
+            workerWaitingStrategy.disconnect();
+        }
+    }
+}