Browse Source

[Improvement-13653][Remote Logging] Support api-server to get task log from remote target (#13654)

Rick Cheng 2 years ago
parent
commit
85605bbef9

+ 49 - 10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.api.service.LoggerService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
+import org.apache.dolphinscheduler.common.utils.LogUtils;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -37,9 +39,13 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.log.LogClient;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.io.File;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
@@ -198,23 +204,39 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
      */
     private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
         Host host = Host.of(taskInstance.getHost());
+        String logPath = taskInstance.getLogPath();
 
         log.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host:{}, logPath:{}, port:{}",
-                taskInstance.getId(), taskInstance.getName(), host.getIp(), taskInstance.getLogPath(), host.getPort());
+                taskInstance.getId(), taskInstance.getName(), host.getIp(), logPath, host.getPort());
 
-        StringBuilder log = new StringBuilder();
+        StringBuilder sb = new StringBuilder();
         if (skipLineNum == 0) {
             String head = String.format(LOG_HEAD_FORMAT,
-                    taskInstance.getLogPath(),
+                    logPath,
                     host,
                     Constants.SYSTEM_LINE_SEPARATOR);
-            log.append(head);
+            sb.append(head);
         }
 
-        log.append(logClient
-                .rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));
+        String logContent = logClient
+                .rollViewLog(host.getIp(), host.getPort(), logPath, skipLineNum, limit);
 
-        return log.toString();
+        if (skipLineNum == 0 && StringUtils.isEmpty(logContent) && RemoteLogUtils.isRemoteLoggingEnable()) {
+            // When getting the log for the first time (skipLineNum=0) returns empty, get the log from remote target
+            try {
+                log.info("Get log {} from remote target", logPath);
+                RemoteLogUtils.getRemoteLog(logPath);
+                List<String> lines = LogUtils.readPartFileContentFromLocal(logPath, skipLineNum, limit);
+                logContent = LogUtils.rollViewLogLines(lines);
+                FileUtils.delete(new File(logPath));
+            } catch (IOException e) {
+                log.error("Error while getting log from remote target", e);
+            }
+        }
+
+        sb.append(logContent);
+
+        return sb.toString();
     }
 
     /**
@@ -225,11 +247,28 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
      */
     private byte[] getLogBytes(TaskInstance taskInstance) {
         Host host = Host.of(taskInstance.getHost());
+        String logPath = taskInstance.getLogPath();
+
         byte[] head = String.format(LOG_HEAD_FORMAT,
-                taskInstance.getLogPath(),
+                logPath,
                 host,
                 Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);
-        return Bytes.concat(head,
-                logClient.getLogBytes(host.getIp(), host.getPort(), taskInstance.getLogPath()));
+
+        byte[] logBytes = logClient.getLogBytes(host.getIp(), host.getPort(), logPath);
+
+        if (logBytes.length == 0 && RemoteLogUtils.isRemoteLoggingEnable()) {
+            // get task log from remote target
+            try {
+                log.info("Get log {} from remote target", logPath);
+                RemoteLogUtils.getRemoteLog(logPath);
+                File logFile = new File(logPath);
+                logBytes = FileUtils.readFileToByteArray(logFile);
+                FileUtils.delete(logFile);
+            } catch (IOException e) {
+                log.error("Error while getting log from remote target", e);
+            }
+        }
+
+        return Bytes.concat(head, logBytes);
     }
 }

+ 163 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LogUtils.java

@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class LogUtils {
+
+    public static byte[] getFileContentBytesFromLocal(String filePath) {
+        try (
+                InputStream in = new FileInputStream(filePath);
+                ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+            byte[] buf = new byte[1024];
+            int len;
+            while ((len = in.read(buf)) != -1) {
+                bos.write(buf, 0, len);
+            }
+            return bos.toByteArray();
+        } catch (IOException e) {
+            log.error("get file bytes error", e);
+        }
+        return new byte[0];
+    }
+
+    public static byte[] getFileContentBytesFromRemote(String filePath) {
+        RemoteLogUtils.getRemoteLog(filePath);
+        return getFileContentBytesFromLocal(filePath);
+    }
+
+    public static byte[] getFileContentBytes(String filePath) {
+        File file = new File(filePath);
+        if (file.exists()) {
+            return getFileContentBytesFromLocal(filePath);
+        }
+        if (RemoteLogUtils.isRemoteLoggingEnable()) {
+            return getFileContentBytesFromRemote(filePath);
+        }
+        return getFileContentBytesFromLocal(filePath);
+    }
+
+    public static List<String> readPartFileContentFromLocal(String filePath,
+                                                            int skipLine,
+                                                            int limit) {
+        File file = new File(filePath);
+        if (file.exists() && file.isFile()) {
+            try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
+                return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
+            } catch (IOException e) {
+                log.error("read file error", e);
+                throw new RuntimeException(String.format("Read file: %s error", filePath), e);
+            }
+        } else {
+            throw new RuntimeException("The file path: " + filePath + " not exists");
+        }
+    }
+
+    public static List<String> readPartFileContentFromRemote(String filePath,
+                                                             int skipLine,
+                                                             int limit) {
+        RemoteLogUtils.getRemoteLog(filePath);
+        return readPartFileContentFromLocal(filePath, skipLine, limit);
+    }
+
+    public static List<String> readPartFileContent(String filePath,
+                                                   int skipLine,
+                                                   int limit) {
+        File file = new File(filePath);
+        if (file.exists()) {
+            return readPartFileContentFromLocal(filePath, skipLine, limit);
+        }
+        if (RemoteLogUtils.isRemoteLoggingEnable()) {
+            return readPartFileContentFromRemote(filePath, skipLine, limit);
+        }
+        return readPartFileContentFromLocal(filePath, skipLine, limit);
+    }
+
+    public static String readWholeFileContentFromRemote(String filePath) {
+        RemoteLogUtils.getRemoteLog(filePath);
+        return LogUtils.readWholeFileContentFromLocal(filePath);
+    }
+
+    public static String readWholeFileContentFromLocal(String filePath) {
+        String line;
+        StringBuilder sb = new StringBuilder();
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) {
+            while ((line = br.readLine()) != null) {
+                sb.append(line + "\r\n");
+            }
+            return sb.toString();
+        } catch (IOException e) {
+            log.error("read file error", e);
+        }
+        return "";
+    }
+
+    public static String readWholeFileContent(String filePath) {
+        File file = new File(filePath);
+        if (file.exists()) {
+            return readWholeFileContentFromLocal(filePath);
+        }
+        if (RemoteLogUtils.isRemoteLoggingEnable()) {
+            return readWholeFileContentFromRemote(filePath);
+        }
+        return readWholeFileContentFromLocal(filePath);
+    }
+
+    public static String rollViewLogLines(List<String> lines) {
+        StringBuilder builder = new StringBuilder();
+        final int MaxResponseLogSize = 65535;
+        int totalLogByteSize = 0;
+        for (String line : lines) {
+            // If a single line of log is exceed max response size, cut off the line
+            final int lineByteSize = line.getBytes(StandardCharsets.UTF_8).length;
+            if (lineByteSize >= MaxResponseLogSize) {
+                builder.append(line, 0, MaxResponseLogSize)
+                        .append(" [this line's size ").append(lineByteSize).append(" bytes is exceed ")
+                        .append(MaxResponseLogSize).append(" bytes, so only ")
+                        .append(MaxResponseLogSize).append(" characters are reserved for performance reasons.]")
+                        .append("\r\n");
+            } else {
+                builder.append(line).append("\r\n");
+            }
+            totalLogByteSize += lineByteSize;
+            if (totalLogByteSize >= MaxResponseLogSize) {
+                break;
+            }
+        }
+
+        return builder.toString();
+    }
+}

+ 0 - 112
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/BaseLogProcessor.java

@@ -17,121 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.processor;
 
-import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public abstract class BaseLogProcessor {
 
-    /**
-     * get files content bytes for download file
-     *
-     * @param filePath file path
-     * @return byte array of file
-     */
-    protected byte[] getFileContentBytesFromLocal(String filePath) {
-        try (
-                InputStream in = new FileInputStream(filePath);
-                ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
-            byte[] buf = new byte[1024];
-            int len;
-            while ((len = in.read(buf)) != -1) {
-                bos.write(buf, 0, len);
-            }
-            return bos.toByteArray();
-        } catch (IOException e) {
-            log.error("get file bytes error", e);
-        }
-        return new byte[0];
-    }
-
-    protected byte[] getFileContentBytesFromRemote(String filePath) {
-        RemoteLogUtils.getRemoteLog(filePath);
-        return getFileContentBytesFromLocal(filePath);
-    }
-
-    protected byte[] getFileContentBytes(String filePath) {
-        File file = new File(filePath);
-        if (file.exists()) {
-            return getFileContentBytesFromLocal(filePath);
-        }
-        if (RemoteLogUtils.isRemoteLoggingEnable()) {
-            return getFileContentBytesFromRemote(filePath);
-        }
-        return getFileContentBytesFromLocal(filePath);
-    }
-
-    /**
-     * read part file content,can skip any line and read some lines
-     *
-     * @param filePath file path
-     * @param skipLine skip line
-     * @param limit read lines limit
-     * @return part file content
-     */
-    protected List<String> readPartFileContentFromLocal(String filePath,
-                                                        int skipLine,
-                                                        int limit) {
-        File file = new File(filePath);
-        if (file.exists() && file.isFile()) {
-            try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
-                return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
-            } catch (IOException e) {
-                log.error("read file error", e);
-                throw new RuntimeException(String.format("Read file: %s error", filePath), e);
-            }
-        } else {
-            throw new RuntimeException("The file path: " + filePath + " not exists");
-        }
-    }
-
-    protected List<String> readPartFileContentFromRemote(String filePath,
-                                                         int skipLine,
-                                                         int limit) {
-        RemoteLogUtils.getRemoteLog(filePath);
-        return readPartFileContentFromLocal(filePath, skipLine, limit);
-    }
-
-    protected List<String> readPartFileContent(String filePath,
-                                               int skipLine,
-                                               int limit) {
-        File file = new File(filePath);
-        if (file.exists()) {
-            return readPartFileContentFromLocal(filePath, skipLine, limit);
-        }
-        if (RemoteLogUtils.isRemoteLoggingEnable()) {
-            return readPartFileContentFromRemote(filePath, skipLine, limit);
-        }
-        return readPartFileContentFromLocal(filePath, skipLine, limit);
-    }
-
-    protected String readWholeFileContentFromRemote(String filePath) {
-        RemoteLogUtils.getRemoteLog(filePath);
-        return LogUtils.readWholeFileContentFromLocal(filePath);
-    }
-
-    protected String readWholeFileContent(String filePath) {
-        File file = new File(filePath);
-        if (file.exists()) {
-            return LogUtils.readWholeFileContentFromLocal(filePath);
-        }
-        if (RemoteLogUtils.isRemoteLoggingEnable()) {
-            return readWholeFileContentFromRemote(filePath);
-        }
-        return LogUtils.readWholeFileContentFromLocal(filePath);
-    }
-
 }

+ 2 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/GetLogBytesProcessor.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.remote.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Message;
 import org.apache.dolphinscheduler.remote.command.MessageType;
 import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequest;
@@ -38,7 +39,7 @@ public class GetLogBytesProcessor extends BaseLogProcessor implements NettyReque
         GetLogBytesRequest getLogRequest = JSONUtils.parseObject(
                 message.getBody(), GetLogBytesRequest.class);
         String path = getLogRequest.getPath();
-        byte[] bytes = getFileContentBytes(path);
+        byte[] bytes = LogUtils.getFileContentBytes(path);
         GetLogBytesResponse getLogResponse = new GetLogBytesResponse(bytes);
         channel.writeAndFlush(getLogResponse.convert2Command(message.getOpaque()));
     }

+ 5 - 23
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/RollViewLogRequest.java

@@ -18,11 +18,11 @@
 package org.apache.dolphinscheduler.remote.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Message;
 import org.apache.dolphinscheduler.remote.command.MessageType;
 import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponse;
 
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 import lombok.extern.slf4j.Slf4j;
@@ -42,30 +42,12 @@ public class RollViewLogRequest extends BaseLogProcessor implements NettyRequest
 
         String rollViewLogPath = rollViewLogRequest.getPath();
 
-        List<String> lines = readPartFileContent(rollViewLogPath,
+        List<String> lines = LogUtils.readPartFileContent(rollViewLogPath,
                 rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
-        StringBuilder builder = new StringBuilder();
-        final int MaxResponseLogSize = 65535;
-        int totalLogByteSize = 0;
-        for (String line : lines) {
-            // If a single line of log is exceed max response size, cut off the line
-            final int lineByteSize = line.getBytes(StandardCharsets.UTF_8).length;
-            if (lineByteSize >= MaxResponseLogSize) {
-                builder.append(line, 0, MaxResponseLogSize)
-                        .append(" [this line's size ").append(lineByteSize).append(" bytes is exceed ")
-                        .append(MaxResponseLogSize).append(" bytes, so only ")
-                        .append(MaxResponseLogSize).append(" characters are reserved for performance reasons.]")
-                        .append("\r\n");
-            } else {
-                builder.append(line).append("\r\n");
-            }
-            totalLogByteSize += lineByteSize;
-            if (totalLogByteSize >= MaxResponseLogSize) {
-                break;
-            }
-        }
+
+        String logContent = LogUtils.rollViewLogLines(lines);
         RollViewLogResponse rollViewLogRequestResponse =
-                new RollViewLogResponse(builder.toString());
+                new RollViewLogResponse(logContent);
         channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(message.getOpaque()));
     }
 

+ 2 - 1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/ViewWholeLogProcessor.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.remote.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Message;
 import org.apache.dolphinscheduler.remote.command.MessageType;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogRequest;
@@ -38,7 +39,7 @@ public class ViewWholeLogProcessor extends BaseLogProcessor implements NettyRequ
         ViewLogRequest viewLogRequest = JSONUtils.parseObject(
                 message.getBody(), ViewLogRequest.class);
         String viewLogPath = viewLogRequest.getPath();
-        String msg = readWholeFileContent(viewLogPath);
+        String msg = LogUtils.readWholeFileContent(viewLogPath);
         ViewLogResponseResponse viewLogResponse = new ViewLogResponseResponse(msg);
         channel.writeAndFlush(viewLogResponse.convert2Command(message.getOpaque()));
     }

+ 2 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java

@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.service.log;
 
 import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
 import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
+import static org.apache.dolphinscheduler.common.utils.LogUtils.readWholeFileContentFromLocal;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -114,7 +115,7 @@ public class LogClient implements AutoCloseable {
         final Host address = new Host(host, port);
         try {
             if (NetUtils.getHost().equals(host)) {
-                return LogUtils.readWholeFileContentFromLocal(request.getPath());
+                return readWholeFileContentFromLocal(request.getPath());
             } else {
                 Message message = request.convert2Command();
                 Message response = this.client.sendSync(address, message, LOG_REQUEST_TIMEOUT);

+ 1 - 1
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java

@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.service.log;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.common.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Message;
 import org.apache.dolphinscheduler.remote.command.MessageType;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogRequest;

+ 0 - 17
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java

@@ -26,11 +26,8 @@ import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator;
 
 import org.apache.commons.lang3.StringUtils;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -186,20 +183,6 @@ public class LogUtils {
         }
     }
 
-    public static String readWholeFileContentFromLocal(String filePath) {
-        String line;
-        StringBuilder sb = new StringBuilder();
-        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) {
-            while ((line = br.readLine()) != null) {
-                sb.append(line + "\r\n");
-            }
-            return sb.toString();
-        } catch (IOException e) {
-            log.error("read file error", e);
-        }
-        return "";
-    }
-
     public static String getTaskInstanceLogFullPathMdc() {
         return MDC.get(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY);
     }