Browse Source

[FIX_BUG_1.3#3789][remote]support netty heart beat (#3868)

* [FIX_BUG_1.3#3789][remote]support netty heart beat

* netty heart beat

* code style

* code style

* code style

* code style

* code style

* delete code docs

* client fail-close
Kirs 4 years ago
parent
commit
08dc6913dc

+ 30 - 21
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@@ -17,17 +17,6 @@
 
 package org.apache.dolphinscheduler.remote;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-
 import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
 import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -41,19 +30,40 @@ import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
 import org.apache.dolphinscheduler.remote.future.ResponseFuture;
 import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.remote.utils.NettyUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
 /**
  * remoting netty client
  */
@@ -162,11 +172,10 @@ public class NettyRemotingClient {
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
-                public void initChannel(SocketChannel ch) throws Exception {
-                    ch.pipeline().addLast(
-                        new NettyDecoder(),
-                        clientHandler,
-                        encoder);
+                public void initChannel(SocketChannel ch) {
+                    ch.pipeline()
+                        .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
+                        .addLast(new NettyDecoder(), clientHandler, encoder);
                 }
             });
         this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {

+ 7 - 5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -39,11 +40,11 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 
 /**
  * remoting netty server
@@ -183,10 +184,11 @@ public class NettyRemotingServer {
      * @param ch socket channel
      */
     private void initNettyChannel(SocketChannel ch) {
-        ChannelPipeline pipeline = ch.pipeline();
-        pipeline.addLast("encoder", encoder);
-        pipeline.addLast("decoder", new NettyDecoder());
-        pipeline.addLast("handler", serverHandler);
+        ch.pipeline()
+            .addLast("encoder", encoder)
+            .addLast("decoder", new NettyDecoder())
+            .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
+            .addLast("handler", serverHandler);
     }
 
     /**

File diff suppressed because it is too large
+ 105 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java


+ 50 - 25
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java

@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.handler;
 
-import io.netty.channel.*;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+
 /**
- *  netty client request handler
+ * netty client request handler
  */
 @ChannelHandler.Sharable
 public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
 
     /**
-     *  netty client
+     * netty client
      */
     private final NettyRemotingClient nettyRemotingClient;
 
+    private static byte[] heartBeatData = "heart_beat".getBytes();
+
     /**
-     *  callback thread executor
+     * callback thread executor
      */
     private final ExecutorService callbackExecutor;
 
@@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors;
 
     /**
-     *  default executor
+     * default executor
      */
     private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
 
-    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
+    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
         this.nettyRemotingClient = nettyRemotingClient;
         this.callbackExecutor = callbackExecutor;
         this.processors = new ConcurrentHashMap();
     }
 
     /**
-     *  When the current channel is not active,
-     *  the current channel has reached the end of its life cycle
+     * When the current channel is not active,
+     * the current channel has reached the end of its life cycle
      *
      * @param ctx channel handler context
      * @throws Exception
@@ -81,7 +91,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     *  The current channel reads data from the remote
+     * The current channel reads data from the remote
      *
      * @param ctx channel handler context
      * @param msg message
@@ -89,55 +99,55 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      */
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        processReceived(ctx.channel(), (Command)msg);
+        processReceived(ctx.channel(), (Command) msg);
     }
 
     /**
      * register processor
      *
      * @param commandType command type
-     * @param processor processor
+     * @param processor   processor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
         this.registerProcessor(commandType, processor, null);
     }
 
     /**
-     *  register processor
+     * register processor
      *
      * @param commandType command type
-     * @param processor processor
-     * @param executor thread executor
+     * @param processor   processor
+     * @param executor    thread executor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
         ExecutorService executorRef = executor;
-        if(executorRef == null){
+        if (executorRef == null) {
             executorRef = defaultExecutor;
         }
         this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
     }
 
     /**
-     *  process received logic
+     * process received logic
      *
      * @param command command
      */
     private void processReceived(final Channel channel, final Command command) {
         ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
-        if(future != null){
+        if (future != null) {
             future.setResponseCommand(command);
             future.release();
-            if(future.getInvokeCallback() != null){
+            if (future.getInvokeCallback() != null) {
                 this.callbackExecutor.submit(new Runnable() {
                     @Override
                     public void run() {
                         future.executeInvokeCallback();
                     }
                 });
-            } else{
+            } else {
                 future.putResponse(command);
             }
-        } else{
+        } else {
             processByCommandType(channel, command);
         }
     }
@@ -163,9 +173,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     *  caught exception
-     * @param ctx channel handler context
-     * @param cause  cause
+     * caught exception
+     *
+     * @param ctx   channel handler context
+     * @param cause cause
      * @throws Exception
      */
     @Override
@@ -175,4 +186,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
         ctx.channel().close();
     }
 
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            Command heartBeat = new Command();
+            heartBeat.setType(CommandType.HEART_BEAT);
+            heartBeat.setBody(heartBeatData);
+            ctx.writeAndFlush(heartBeat)
+                .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
 }

+ 43 - 24
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java

@@ -17,22 +17,30 @@
 
 package org.apache.dolphinscheduler.remote.handler;
 
-import io.netty.channel.*;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+
+
 /**
- *  netty server request handler
+ * netty server request handler
  */
 @ChannelHandler.Sharable
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
     private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
 
     /**
-     *  netty remote server
+     * netty remote server
      */
     private final NettyRemotingServer nettyRemotingServer;
 
     /**
-     *  server processors queue
+     * server processors queue
      */
     private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
 
-    public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
+    public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
         this.nettyRemotingServer = nettyRemotingServer;
     }
 
     /**
-     *  When the current channel is not active,
-     *  the current channel has reached the end of its life cycle
+     * When the current channel is not active,
+     * the current channel has reached the end of its life cycle
+     *
      * @param ctx channel handler context
      * @throws Exception
      */
@@ -73,38 +82,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
      */
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        processReceived(ctx.channel(), (Command)msg);
+        processReceived(ctx.channel(), (Command) msg);
     }
 
     /**
      * register processor
      *
      * @param commandType command type
-     * @param processor processor
+     * @param processor   processor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
         this.registerProcessor(commandType, processor, null);
     }
 
     /**
-     *  register processor
+     * register processor
      *
      * @param commandType command type
-     * @param processor processor
-     * @param executor thread executor
+     * @param processor   processor
+     * @param executor    thread executor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
         ExecutorService executorRef = executor;
-        if(executorRef == null){
+        if (executorRef == null) {
             executorRef = nettyRemotingServer.getDefaultExecutor();
         }
         this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
     }
 
     /**
-     *  process received logic
+     * process received logic
+     *
      * @param channel channel
-     * @param msg message
+     * @param msg     message
      */
     private void processReceived(final Channel channel, final Command msg) {
         final CommandType commandType = msg.getType();
@@ -132,22 +142,22 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     *  caught exception
+     * caught exception
      *
-     * @param ctx channel handler context
+     * @param ctx   channel handler context
      * @param cause cause
      * @throws Exception
      */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        logger.error("exceptionCaught : {}",cause.getMessage(), cause);
+        logger.error("exceptionCaught : {}", cause.getMessage(), cause);
         ctx.channel().close();
     }
 
     /**
-     *  channel write changed
+     * channel write changed
      *
-     * @param ctx  channel handler context
+     * @param ctx channel handler context
      * @throws Exception
      */
     @Override
@@ -158,16 +168,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
         if (!ch.isWritable()) {
             if (logger.isWarnEnabled()) {
                 logger.warn("{} is not writable, over high water level : {}",
-                        ch, config.getWriteBufferHighWaterMark());
+                    ch, config.getWriteBufferHighWaterMark());
             }
 
             config.setAutoRead(false);
         } else {
             if (logger.isWarnEnabled()) {
                 logger.warn("{} is writable, to low water : {}",
-                        ch, config.getWriteBufferLowWaterMark());
+                    ch, config.getWriteBufferLowWaterMark());
             }
             config.setAutoRead(true);
         }
     }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            ctx.channel().close();
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
 }

+ 4 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.remote.utils;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
-
 /**
  * constant
  */
@@ -30,6 +29,10 @@ public class Constants {
 
     public static final String SLASH = "/";
 
+    public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000;
+
+    public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60;
+
     /**
      * charset
      */