瀏覽代碼

Fix exception occur in RpcServer side, it will not be sent to RpcClient (#15536)

Wenjun Ruan 1 年之前
父節點
當前提交
86ef9666c4

+ 1 - 1
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/StandardRpcRequest.java

@@ -40,7 +40,7 @@ public class StandardRpcRequest implements IRpcRequest {
         final Class<?>[] argsTypes = new Class[args.length];
         for (int i = 0; i < args.length; i++) {
             argsBytes[i] = JsonSerializer.serialize(args[i]);
-            argsTypes[i] = args[i].getClass();
+            argsTypes[i] = args[i] == null ? null : args[i].getClass();
         }
         return new StandardRpcRequest(argsBytes, argsTypes);
     }

+ 6 - 0
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java

@@ -33,31 +33,37 @@ public class NettyServerConfig {
     /**
      * init the server connectable queue
      */
+    @Builder.Default
     private int soBacklog = 1024;
 
     /**
      * whether tpc delay
      */
+    @Builder.Default
     private boolean tcpNoDelay = true;
 
     /**
      * whether keep alive
      */
+    @Builder.Default
     private boolean soKeepalive = true;
 
     /**
      * send buffer size
      */
+    @Builder.Default
     private int sendBufferSize = 65535;
 
     /**
      * receive buffer size
      */
+    @Builder.Default
     private int receiveBufferSize = 65535;
 
     /**
      * worker threads,default get machine cpus
      */
+    @Builder.Default
     private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
 
     /**

+ 6 - 1
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java

@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.extract.base.server;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
 public class ServerMethodInvokerImpl implements ServerMethodInvoker {
@@ -36,7 +37,11 @@ public class ServerMethodInvokerImpl implements ServerMethodInvoker {
     @Override
     public Object invoke(Object... args) throws Throwable {
         // todo: check the request param when register
-        return method.invoke(serviceBean, args);
+        try {
+            return method.invoke(serviceBean, args);
+        } catch (InvocationTargetException ex) {
+            throw ex.getTargetException();
+        }
     }
 
     @Override

+ 28 - 9
dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java

@@ -17,12 +17,19 @@
 
 package org.apache.dolphinscheduler.extract.base.client;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
 import org.apache.dolphinscheduler.extract.base.RpcMethod;
 import org.apache.dolphinscheduler.extract.base.RpcService;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
 import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
 
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -32,12 +39,18 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
 
     private NettyRemotingServer nettyRemotingServer;
 
+    private String serverAddress;
+
     @BeforeEach
     public void setUp() {
-        nettyRemotingServer =
-                new NettyRemotingServer(NettyServerConfig.builder().serverName("ApiServer").listenPort(12345).build());
+        int listenPort = RandomUtils.nextInt(10000, 20000);
+        NettyServerConfig nettyServerConfig = NettyServerConfig.builder()
+                .serverName("ApiServer")
+                .listenPort(listenPort)
+                .build();
+        nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
         nettyRemotingServer.start();
-
+        serverAddress = "localhost:" + listenPort;
         new SpringServerMethodInvokerDiscovery(nettyRemotingServer)
                 .postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl");
     }
@@ -45,23 +58,26 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
     @Test
     public void getProxyClient() {
         IService proxyClient =
-                SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class);
+                SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress, IService.class);
         Assertions.assertNotNull(proxyClient);
     }
 
     @Test
     public void testPing() {
         IService proxyClient =
-                SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class);
-        String ping = proxyClient.ping("ping");
-        Assertions.assertEquals("pong", ping);
+                SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress, IService.class);
+        assertEquals("pong", proxyClient.ping("ping"));
+
+        MethodInvocationException methodInvocationException =
+                Assertions.assertThrows(MethodInvocationException.class, () -> proxyClient.ping(null));
+        assertEquals("ping: null is illegal", methodInvocationException.getMessage());
     }
 
     @Test
     public void testVoid() {
         IService proxyClient =
-                SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class);
-        Assertions.assertDoesNotThrow(proxyClient::voidMethod);
+                SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress, IService.class);
+        assertDoesNotThrow(proxyClient::voidMethod);
     }
 
     @AfterEach
@@ -83,6 +99,9 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
 
         @Override
         public String ping(String ping) {
+            if (StringUtils.isEmpty(ping)) {
+                throw new IllegalArgumentException("ping: " + ping + " is illegal");
+            }
             return "pong";
         }