Browse Source

[Improvement][worker] Optimize KillYarnJob (4939) (#4943)

wenjun 4 years ago
parent
commit
ea6b1de120

+ 27 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@@ -19,6 +19,10 @@ package org.apache.dolphinscheduler.common.utils;
 
 import org.apache.dolphinscheduler.common.Constants;
 
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -26,6 +30,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * logger utils
@@ -36,6 +41,8 @@ public class LoggerUtils {
         throw new UnsupportedOperationException("Construct LoggerUtils");
     }
 
+    private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class);
+
     /**
      * rules for extracting application ID
      */
@@ -101,6 +108,26 @@ public class LoggerUtils {
         return appIds;
     }
 
+    /**
+     * read whole file content
+     *
+     * @param filePath file path
+     * @return whole file content
+     */
+    public static String readWholeFileContent(String filePath) {
+        String line;
+        StringBuilder sb = new StringBuilder();
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) {
+            while ((line = br.readLine()) != null) {
+                sb.append(line + "\r\n");
+            }
+            return sb.toString();
+        } catch (IOException e) {
+            logger.error("read file error", e);
+        }
+        return "";
+    }
+
     public static void logError(Optional<Logger> optionalLogger
             , String error) {
         optionalLogger.ifPresent((Logger logger) -> logger.error(error));

+ 48 - 6
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java

@@ -14,30 +14,72 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.utils;
 
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Optional;
+
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.Test.None;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerUtils.class})
 public class LoggerUtilsTest {
     private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class);
 
     @Test
     public void buildTaskId() {
 
-      String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,79,4084,15210);
+        String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, 79, 4084, 15210);
 
-      Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId);
+        Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId);
     }
 
     @Test
     public void getAppIds() {
-       List<String> appIdList =  LoggerUtils.getAppIds("Running job: application_1_1",logger);
-       Assert.assertEquals("application_1_1", appIdList.get(0));
+        List<String> appIdList = LoggerUtils.getAppIds("Running job: application_1_1", logger);
+        Assert.assertEquals("application_1_1", appIdList.get(0));
+
+    }
+
+    @Test
+    public void testReadWholeFileContent() throws Exception {
+        BufferedReader bufferedReader = PowerMockito.mock(BufferedReader.class);
+        PowerMockito.whenNew(BufferedReader.class).withAnyArguments().thenReturn(bufferedReader);
+        PowerMockito.when(bufferedReader.readLine()).thenReturn("").thenReturn(null);
+        FileInputStream fileInputStream = PowerMockito.mock(FileInputStream.class);
+        PowerMockito.whenNew(FileInputStream.class).withAnyArguments().thenReturn(fileInputStream);
+
+        InputStreamReader inputStreamReader = PowerMockito.mock(InputStreamReader.class);
+        PowerMockito.whenNew(InputStreamReader.class).withAnyArguments().thenReturn(inputStreamReader);
+
+        String log = LoggerUtils.readWholeFileContent("/tmp/log");
+        Assert.assertNotNull(log);
+
+        PowerMockito.when(bufferedReader.readLine()).thenThrow(new IOException());
+        log = LoggerUtils.readWholeFileContent("/tmp/log");
+        Assert.assertNotNull(log);
+    }
+
+    @Test(expected = None.class)
+    public void testLogError() {
+        Optional<Logger> loggerOptional = Optional.of(this.logger);
 
+        LoggerUtils.logError(loggerOptional, "error message");
+        LoggerUtils.logError(loggerOptional, new RuntimeException("error message"));
+        LoggerUtils.logError(loggerOptional, "error message", new RuntimeException("runtime exception"));
+        LoggerUtils.logInfo(loggerOptional, "info message");
     }
 }

+ 2 - 26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.log;
 
 import org.apache.dolphinscheduler.common.utils.IOUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@@ -31,13 +32,11 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collections;
@@ -86,7 +85,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
             case VIEW_WHOLE_LOG_REQUEST:
                 ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject(
                         command.getBody(), ViewLogRequestCommand.class);
-                String msg = readWholeFileContent(viewLogRequest.getPath());
+                String msg = LoggerUtils.readWholeFileContent(viewLogRequest.getPath());
                 ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
                 channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
                 break;
@@ -182,27 +181,4 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
         return Collections.emptyList();
     }
 
-    /**
-     * read whole file content
-     *
-     * @param filePath file path
-     * @return whole file content
-     */
-    private String readWholeFileContent(String filePath) {
-        BufferedReader br = null;
-        String line;
-        StringBuilder sb = new StringBuilder();
-        try {
-            br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
-            while ((line = br.readLine()) != null) {
-                sb.append(line + "\r\n");
-            }
-            return sb.toString();
-        } catch (IOException e) {
-            logger.error("read file error",e);
-        } finally {
-            IOUtils.closeQuietly(br);
-        }
-        return "";
-    }
 }

+ 56 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java

@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.log;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
+
+import org.junit.Test;
+import org.junit.Test.None;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import io.netty.channel.Channel;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerUtils.class})
+public class LoggerRequestProcessorTest {
+
+    @Test(expected = None.class)
+    public void testProcessViewWholeLogRequest() {
+        Channel channel = PowerMockito.mock(Channel.class);
+        PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
+        PowerMockito.mockStatic(LoggerUtils.class);
+        PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
+
+        ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand("/log/path");
+
+        Command command = new Command();
+        command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
+        command.setBody(JSONUtils.toJsonByteArray(logRequestCommand));
+
+        LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor();
+        loggerRequestProcessor.process(channel, command);
+    }
+}

+ 12 - 6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.service.log;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.IPUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,12 +118,16 @@ public class LogClientService {
         String result = "";
         final Host address = new Host(host, port);
         try {
-            Command command = request.convert2Command();
-            Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
-            if (response != null) {
-                ViewLogResponseCommand viewLog = JSONUtils.parseObject(
-                        response.getBody(), ViewLogResponseCommand.class);
-                return viewLog.getMsg();
+            if (IPUtils.getLocalHost().equals(host)) {
+                result = LoggerUtils.readWholeFileContent(request.getPath());
+            } else {
+                Command command = request.convert2Command();
+                Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
+                if (response != null) {
+                    ViewLogResponseCommand viewLog = JSONUtils.parseObject(
+                            response.getBody(), ViewLogResponseCommand.class);
+                    result = viewLog.getMsg();
+                }
             }
         } catch (Exception e) {
             logger.error("view log error", e);

+ 143 - 0
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java

@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.log;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
+import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
+import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
+import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.IPUtils;
+
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.Test.None;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LogClientService.class, IPUtils.class, LoggerUtils.class, NettyRemotingClient.class})
+public class LogClientServiceTest {
+
+    @Test
+    public void testViewLogFromLocal() {
+        String localMachine = "LOCAL_MACHINE";
+        int port = 1234;
+        String path = "/tmp/log";
+
+        PowerMockito.mockStatic(IPUtils.class);
+        PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine);
+        PowerMockito.mockStatic(LoggerUtils.class);
+        PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11");
+
+        LogClientService logClientService = new LogClientService();
+        String log = logClientService.viewLog(localMachine, port, path);
+        Assert.assertNotNull(log);
+    }
+
+    @Test
+    public void testViewLogFromRemote() throws Exception {
+        String localMachine = "LOCAL_MACHINE";
+        int port = 1234;
+        String path = "/tmp/log";
+
+        PowerMockito.mockStatic(IPUtils.class);
+        PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine + "1");
+
+        NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
+        PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+        Command command = new Command();
+        command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8));
+        PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
+                .thenReturn(command);
+        LogClientService logClientService = new LogClientService();
+        String log = logClientService.viewLog(localMachine, port, path);
+        Assert.assertNotNull(log);
+    }
+
+    @Test(expected = None.class)
+    public void testClose() throws Exception {
+        NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
+        PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+        PowerMockito.doNothing().when(remotingClient).close();
+
+        LogClientService logClientService = new LogClientService();
+        logClientService.close();
+    }
+
+    @Test
+    public void testRollViewLog() throws Exception {
+        NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
+        PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+        Command command = new Command();
+        command.setBody(JSONUtils.toJsonByteArray(new RollViewLogResponseCommand("success")));
+        PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
+                .thenReturn(command);
+
+        LogClientService logClientService = new LogClientService();
+        String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
+        Assert.assertNotNull(msg);
+    }
+
+    @Test
+    public void testGetLogBytes() throws Exception {
+        NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
+        PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+        Command command = new Command();
+        command.setBody(JSONUtils.toJsonByteArray(new GetLogBytesResponseCommand("log".getBytes(StandardCharsets.UTF_8))));
+        PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
+                .thenReturn(command);
+
+        LogClientService logClientService = new LogClientService();
+        byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log");
+        Assert.assertNotNull(logBytes);
+    }
+
+    @Test
+    public void testRemoveTaskLog() throws Exception {
+        NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
+        PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+        Command command = new Command();
+        command.setBody(JSONUtils.toJsonByteArray(new RemoveTaskLogResponseCommand(true)));
+        PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
+                .thenReturn(command);
+
+        LogClientService logClientService = new LogClientService();
+        Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path");
+        Assert.assertTrue(status);
+    }
+
+    @Test
+    public void testIsRunning() {
+        LogClientService logClientService = new LogClientService();
+        Assert.assertTrue(logClientService.isRunning());
+    }
+}

+ 2 - 1
pom.xml

@@ -908,6 +908,7 @@
                         <include>**/server/entity/SQLTaskExecutionContextTest.java</include>
                         <include>**/server/log/MasterLogFilterTest.java</include>
                         <include>**/server/log/SensitiveDataConverterTest.java</include>
+                        <include>**/server/log/LoggerRequestProcessorTest.java</include>
                         <!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>-->
                         <include>**/server/log/TaskLogFilterTest.java</include>
                         <include>**/server/log/WorkerLogFilterTest.java</include>
@@ -966,7 +967,7 @@
                         <include>**/service/zk/RegisterOperatorTest.java</include>
                         <include>**/service/queue/TaskUpdateQueueTest.java</include>
                         <include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
-
+                        <include>**/service/log/LogClientServiceTest.java</include>
                         <include>**/service/alert/AlertClientServiceTest.java</include>
                         <include>**/dao/mapper/DataSourceUserMapperTest.java</include>
                         <!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->