Browse Source

add rpc config

CalvinKirs 4 years ago
parent
commit
230f3a30d2

+ 10 - 0
dolphinscheduler-remote/pom.xml

@@ -69,6 +69,16 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+            <version>0.9.11</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
 
     </dependencies>
 </project>

+ 5 - 2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java

@@ -22,15 +22,18 @@ public class MainTest {
         Host host = new Host("127.0.0.1", 12636);
 
         IRpcClient rpcClient = new RpcClient();
-        UserService userService = rpcClient.create(UserService.class, host);
+        IUserService userService = rpcClient.create(IUserService.class, host);
         boolean result = userService.say("calvin");
         System.out.println("异步回掉成功" + result);
 
         System.out.println(userService.hi(10));
         System.out.println(userService.hi(188888888));
 
-        UserService user = rpcClient.create(UserService.class, host);
+        IUserService user = rpcClient.create(IUserService.class, host);
         System.out.println(user.hi(99999));
         System.out.println(user.hi(998888888));
+
+        System.out.println(IUserService.class.getSimpleName());
+        System.out.println(UserService.class.getSimpleName());
     }
 }

+ 5 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java

@@ -1,18 +1,22 @@
 package org.apache.dolphinscheduler.remote.rpc;
 
 import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
+import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
 
 /**
  * @author jiangli
  * @date 2021-01-11 21:05
  */
-public class UserService  {
+@RpcService("IUserService")
+public class UserService  implements IUserService{
 
     @Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999)
+    @Override
     public Boolean say(String s) {
         return true;
     }
 
+    @Override
     public String hi(int num) {
         return "this world has " + num + "sun";
     }

+ 29 - 0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/RpcService.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.rpc.base;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcService {
+    String value() default "";
+}

+ 15 - 6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java

@@ -25,7 +25,9 @@ import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants;
  */
 public class ConsumerConfig {
 
-    private Class<? extends AbstractRpcCallBack> callBackClass;
+    private Class<? extends AbstractRpcCallBack> serviceCallBackClass;
+
+    private Class<? extends AbstractRpcCallBack> ackCallBackClass;
 
     private String serviceName;
 
@@ -33,13 +35,20 @@ public class ConsumerConfig {
 
     private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES;
 
-    public Class<? extends AbstractRpcCallBack> getCallBackClass() {
-        return callBackClass;
+    public Class<? extends AbstractRpcCallBack> getServiceCallBackClass() {
+        return serviceCallBackClass;
+    }
+
+    public void setServiceCallBackClass(Class<? extends AbstractRpcCallBack> serviceCallBackClass) {
+        this.serviceCallBackClass = serviceCallBackClass;
+    }
+
+    public Class<? extends AbstractRpcCallBack> getAckCallBackClass() {
+        return ackCallBackClass;
     }
 
-    //set call back class
-    void setCallBackClass(Class<? extends AbstractRpcCallBack> callBackClass) {
-        this.callBackClass = callBackClass;
+    public void setAckCallBackClass(Class<? extends AbstractRpcCallBack> ackCallBackClass) {
+        this.ackCallBackClass = ackCallBackClass;
     }
 
     public String getServiceName() {

+ 4 - 7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java

@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.remote.rpc.client;
 
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.rpc.Invoker;
 import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
 import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
 import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
@@ -37,9 +36,6 @@ import net.bytebuddy.implementation.bind.annotation.RuntimeType;
  */
 public class ConsumerInterceptor {
 
-    private Invoker invoker;
-
-
     private Host host;
 
     private NettyClient nettyClient = NettyClient.getInstance();
@@ -52,7 +48,7 @@ public class ConsumerInterceptor {
     public Object intercept(@AllArguments Object[] args, @Origin Method method) throws RemotingException {
         RpcRequest request = buildReq(args, method);
 
-        String serviceName = method.getDeclaringClass().getName() + method.getName();
+        String serviceName = method.getDeclaringClass().getSimpleName() + method.getName();
         ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
         if (null == consumerConfig) {
             consumerConfig = cacheServiceConfig(method, serviceName);
@@ -76,7 +72,7 @@ public class ConsumerInterceptor {
     private RpcRequest buildReq(Object[] args, Method method) {
         RpcRequest request = new RpcRequest();
         request.setRequestId(UUID.randomUUID().toString());
-        request.setClassName(method.getDeclaringClass().getName());
+        request.setClassName(method.getDeclaringClass().getSimpleName());
         request.setMethodName(method.getName());
         request.setParameterTypes(method.getParameterTypes());
 
@@ -94,7 +90,8 @@ public class ConsumerInterceptor {
         if (annotationPresent) {
             Rpc rpc = method.getAnnotation(Rpc.class);
             consumerConfig.setAsync(rpc.async());
-            consumerConfig.setCallBackClass(rpc.callback());
+            consumerConfig.setServiceCallBackClass(rpc.serviceCallback());
+            consumerConfig.setAckCallBackClass(rpc.ackCallback());
             consumerConfig.setRetries(rpc.retries());
         }
 

+ 42 - 0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RequestEventType.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.rpc.common;
+
+public enum RequestEventType {
+
+    HEARTBEAT((byte)1,"heartbeat"),
+    BUSINESS((byte)2,"business request");
+
+
+    private Byte type;
+
+    private String description;
+
+    RequestEventType(Byte type, String description) {
+        this.type = type;
+        this.description = description;
+    }
+
+    public Byte getType() {
+        return type;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+}

+ 41 - 0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ResponseEventType.java

@@ -0,0 +1,41 @@
+package org.apache.dolphinscheduler.remote.rpc.common;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public enum ResponseEventType {
+
+    ACK((byte) 1, "ack"),
+    BUSINESS_RSP((byte) 2, "business response");
+
+    private Byte type;
+
+    private String description;
+
+    ResponseEventType(Byte type, String description) {
+        this.type = type;
+        this.description = description;
+    }
+
+
+    public Byte getType() {
+        return type;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+}

+ 13 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java

@@ -27,9 +27,13 @@ public class RpcRequest {
     private String methodName;
     private Class<?>[] parameterTypes;
     private Object[] parameters;
-    // 0 hear beat,1 businness msg
+    /**
+     * @see RequestEventType
+     */
     private Byte eventType = 1;
 
+    private Boolean ack;
+
     public Byte getEventType() {
         return eventType;
     }
@@ -77,4 +81,12 @@ public class RpcRequest {
     public void setParameters(Object[] parameters) {
         this.parameters = parameters;
     }
+
+    public Boolean getAck() {
+        return ack;
+    }
+
+    public void setAck(Boolean ack) {
+        this.ack = ack;
+    }
 }

+ 5 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java

@@ -26,7 +26,11 @@ public class RpcResponse {
     private String msg;
     private Object result;
     private Byte status;
-    private Integer responseType;
+
+    /**
+     * @see ResponseEventType
+     */
+    private Byte responseType;
 
     public String getRequestId() {
         return requestId;

+ 68 - 0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/config/ServiceBean.java

@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.rpc.config;
+
+import org.apache.dolphinscheduler.remote.rpc.IUserService;
+import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ServiceBean {
+
+    private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class);
+
+    private static Map<String, Object> serviceMap = new HashMap<>();
+
+    private static AtomicBoolean initialized = new AtomicBoolean(false);
+
+    private static synchronized void init() {
+        Reflections f = new Reflections("org/apache/dolphinscheduler/remote/rpc");
+
+
+        List<Class<?>> list = new ArrayList<>(f.getTypesAnnotatedWith(RpcService.class));
+        list.forEach(rpcClass -> {
+            RpcService rpcService = rpcClass.getAnnotation(RpcService.class);
+            serviceMap.put(rpcService.value(), rpcClass);
+        });
+    }
+
+    public static void main(String[] args) {
+        init();
+    }
+
+    public static Class getServiceClass(String className) {
+        if (initialized.get()) {
+            return (Class) serviceMap.get(className);
+        } else {
+            init();
+        }
+        return (Class) serviceMap.get(className);
+    }
+
+}

+ 11 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java

@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
 import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
 import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
 
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 
 import org.slf4j.Logger;
@@ -75,7 +76,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
         new FastThreadLocalThread(() -> {
             try {
                 if (rsp.getStatus() == 0) {
-                    consumerConfig.getCallBackClass().newInstance().run(rsp.getResult());
+                    try {
+                        consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult());
+                    } catch (InvocationTargetException | NoSuchMethodException e) {
+                       logger.error("rpc call back error",e);
+                    }
+
                 } else {
                     logger.error("xxxx fail");
                 }
@@ -108,4 +114,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
         logger.error("exceptionCaught : {}", cause.getMessage(), cause);
         ctx.channel().close();
     }
+
+    private void executeAsyncHandler(){
+
+    }
 }

+ 9 - 3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

@@ -17,10 +17,16 @@
 
 package org.apache.dolphinscheduler.remote.rpc.remote;
 
+import org.apache.dolphinscheduler.remote.rpc.IUserService;
+import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
 import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
 import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
+import org.apache.dolphinscheduler.remote.rpc.config.ServiceBean;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,11 +68,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
         RpcResponse response = new RpcResponse();
         if (req.getEventType() == 0) {
 
-            logger.info("接受心跳消息!...");
+            logger.info("accept heartbeat msg");
             return;
         }
         response.setRequestId(req.getRequestId());
 
+
         response.setStatus((byte) 0);
 
 
@@ -79,8 +86,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
         Object[] arguments = req.getParameters();
         Object result = null;
         try {
-
-            Class serviceClass = Class.forName(classname);
+            Class serviceClass = ServiceBean.getServiceClass(classname);
 
             Object object = serviceClass.newInstance();
 

+ 1 - 0
pom.xml

@@ -1101,5 +1101,6 @@
         <module>dolphinscheduler-service</module>
         <module>dolphinscheduler-spi</module>
         <module>dolphinscheduler-microbench</module>
+        <module>dolphinscheduler-remote-connfig</module>
     </modules>
 </project>