Browse Source

Tgt invalid #742 fix,Concurrent task log bug #730 fix,logback.xml set log path support #723 fix (#761)

* mission log disorder,bug #751 fix

* the log path of the task and the log path of the task instance are different. The log cannot be viewed. #723

* the log path of the task and the log path of the task instance are different. The log cannot be viewed. #723 bug fix

* after starting kerberos authentication, tgt expires after one day,bug #742 fix

* log pattern modify

* LoggerServer remove comment code and ShellCommandExecutor modify

* PythonCommandExecutor modify

* Concurrent task log bug #730 fix

* remove invalid commit
乔占卫 5 years ago
parent
commit
e2cf4c50b9

+ 4 - 0
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java

@@ -66,6 +66,10 @@ public class HadoopUtils implements Closeable {
     }
 
     public static HadoopUtils getInstance(){
+        // if kerberos startup , renew HadoopUtils
+        if (CommonUtils.getKerberosStartupState()){
+            return new HadoopUtils();
+        }
         return instance;
     }
 

+ 2 - 31
escheduler-server/src/main/java/cn/escheduler/server/rpc/LoggerServer.java

@@ -100,13 +100,7 @@ public class LoggerServer {
             StringBuilder sb = new StringBuilder();
             boolean errorLineFlag = false;
             for (String line : list){
-                if (line.contains("TaskLogger")){
-                    errorLineFlag = filterLine(request.getPath(),line);
-                }
-
-                if (!errorLineFlag || !line.contains("TaskLogger")){
-                    sb.append(line + "\r\n");
-                }
+                sb.append(line + "\r\n");
             }
             RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build();
             responseObserver.onNext(retInfoBuild);
@@ -204,13 +198,7 @@ public class LoggerServer {
             br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
             boolean errorLineFlag = false;
             while ((line = br.readLine()) != null){
-                if (line.contains("TaskLogger")){
-                    errorLineFlag = filterLine(path,line);
-                }
-
-                if (!errorLineFlag || !line.contains("TaskLogger")){
-                    sb.append(line + "\r\n");
-                }
+                sb.append(line + "\r\n");
             }
 
             return sb.toString();
@@ -228,21 +216,4 @@ public class LoggerServer {
         return null;
     }
 
-
-    /**
-     *
-     * @param path
-     * @param line
-     * @return
-     */
-    private static boolean filterLine(String path,String line){
-        String removeSuffix = path.substring(0, path.length() - 4);
-        String[] strArrs = removeSuffix.split("/");
-        String taskAppId = String.format("%s_%s_%s",
-                strArrs[strArrs.length - 3],
-                strArrs[strArrs.length-2],
-                strArrs[strArrs.length - 1]);
-        return !line.contains(taskAppId);
-    }
-
 }

+ 5 - 2
escheduler-server/src/main/java/cn/escheduler/server/utils/LoggerUtils.java

@@ -37,7 +37,9 @@ public class LoggerUtils {
     /**
      * Task Logger's prefix
      */
-    public static final String TASK_LOGGER_INFO_PREFIX = "TaskLogInfo";
+    public static final String TASK_LOGGER_INFO_PREFIX = "TASK";
+
+    public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
 
     /**
      *  build job id
@@ -51,7 +53,8 @@ public class LoggerUtils {
                                   int processDefId,
                                   int processInstId,
                                   int taskId){
-        return String.format("%s-%s/%s/%s",affix,
+        // - [taskAppId=TASK_79_4084_15210]
+        return String.format(" - [taskAppId=%s-%s-%s-%s]",affix,
                 processDefId,
                 processInstId,
                 taskId);

+ 18 - 3
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogDiscriminator.java

@@ -18,21 +18,27 @@ package cn.escheduler.server.worker.log;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.sift.AbstractDiscriminator;
+import cn.escheduler.common.Constants;
 import cn.escheduler.server.utils.LoggerUtils;
 
 public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
 
     private String key;
 
+    private String logBase;
+
     /**
      * logger name should be like:
-     *     Task Logger name should be like: TaskLogInfo-{processDefinitionId}/{processInstanceId}/{taskInstanceId}
+     *     Task Logger name should be like: Task-{processDefinitionId}-{processInstanceId}-{taskInstanceId}
      */
+    @Override
     public String getDiscriminatingValue(ILoggingEvent event) {
-        String loggerName = event.getLoggerName();
+        String loggerName = event.getLoggerName()
+                .split(Constants.EQUAL_SIGN)[1];
         String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-";
         if (loggerName.startsWith(prefix)) {
-            return loggerName.substring(prefix.length());
+            return loggerName.substring(prefix.length(),
+                    loggerName.length() - 1).replace("-","/");
         } else {
             return "unknown_task";
         }
@@ -43,6 +49,7 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
         started = true;
     }
 
+    @Override
     public String getKey() {
         return key;
     }
@@ -50,4 +57,12 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
     public void setKey(String key) {
         this.key = key;
     }
+
+    public String getLogBase() {
+        return logBase;
+    }
+
+    public void setLogBase(String logBase) {
+        this.logBase = logBase;
+    }
 }

+ 1 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogFilter.java

@@ -28,7 +28,7 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
 
     @Override
     public FilterReply decide(ILoggingEvent event) {
-        if (event.getLoggerName().startsWith(LoggerUtils.TASK_LOGGER_INFO_PREFIX)) {
+        if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME)) {
             return FilterReply.ACCEPT;
         }
         return FilterReply.DENY;

+ 15 - 2
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@@ -17,6 +17,8 @@
 package cn.escheduler.server.worker.runner;
 
 
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.sift.SiftingAppender;
 import cn.escheduler.common.Constants;
 import cn.escheduler.common.enums.ExecutionStatus;
 import cn.escheduler.common.enums.TaskRecordStatus;
@@ -40,6 +42,7 @@ import cn.escheduler.dao.model.TaskInstance;
 import cn.escheduler.dao.model.Tenant;
 import cn.escheduler.server.utils.LoggerUtils;
 import cn.escheduler.server.utils.ParamUtils;
+import cn.escheduler.server.worker.log.TaskLogDiscriminator;
 import cn.escheduler.server.worker.task.AbstractTask;
 import cn.escheduler.server.worker.task.TaskManager;
 import cn.escheduler.server.worker.task.TaskProps;
@@ -109,7 +112,7 @@ public class TaskScheduleThread implements Runnable {
 
             // get tenant info
             Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
-                                                    processDefine.getUserId());
+                    processDefine.getUserId());
 
             if(tenant == null){
                 logger.error("cannot find the tenant, process definition id:{}, user id:{}",
@@ -220,8 +223,18 @@ public class TaskScheduleThread implements Runnable {
      * @return
      */
     private String getTaskLogPath() {
+        String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
+                .getLogger("ROOT")
+                .getAppender("TASKLOGFILE"))
+                .getDiscriminator()).getLogBase();
+        if (baseLog.startsWith(Constants.SINGLE_SLASH)){
+            return baseLog + Constants.SINGLE_SLASH +
+                    taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH  +
+                    taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH  +
+                    taskInstance.getId() + ".log";
+        }
         return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
-                "logs" +  Constants.SINGLE_SLASH +
+                baseLog +  Constants.SINGLE_SLASH +
                 taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH  +
                 taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH  +
                 taskInstance.getId() + ".log";

+ 3 - 6
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java

@@ -22,6 +22,7 @@ import cn.escheduler.common.thread.ThreadUtils;
 import cn.escheduler.common.utils.HadoopUtils;
 import cn.escheduler.dao.ProcessDao;
 import cn.escheduler.dao.model.TaskInstance;
+import cn.escheduler.server.utils.LoggerUtils;
 import cn.escheduler.server.utils.ProcessUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -347,7 +348,7 @@ public abstract class AbstractCommandExecutor {
      * get the standard output of the process
      */
     private void parseProcessOutput(Process process) {
-        String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskAppId);
+        String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskAppId);
         ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
         parseProcessOutputExecutorService.submit(new Runnable(){
             @Override
@@ -361,10 +362,7 @@ public abstract class AbstractCommandExecutor {
                     long lastFlushTime = System.currentTimeMillis();
 
                     while ((line = inReader.readLine()) != null) {
-                        if(checkShowLog(line)){
-                            logBuffer.add(line);
-                        }
-
+                        logBuffer.add(line);
                         lastFlushTime = flush(lastFlushTime);
                     }
                 } catch (Exception e) {
@@ -566,7 +564,6 @@ public abstract class AbstractCommandExecutor {
 
     protected abstract String buildCommandFilePath();
     protected abstract String commandType();
-    protected abstract boolean checkShowLog(String line);
     protected abstract boolean checkFindApp(String line);
     protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
 }

+ 0 - 5
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java

@@ -100,11 +100,6 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
         return pythonHome;
     }
 
-    @Override
-    protected boolean checkShowLog(String line) {
-        return true;
-    }
-
     @Override
     protected boolean checkFindApp(String line) {
         return true;

+ 0 - 5
escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java

@@ -60,11 +60,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
         return SH;
     }
 
-    @Override
-    protected boolean checkShowLog(String line) {
-        return line.contains(taskAppId) || !line.contains("cn.escheduler.server.worker.log.TaskLogger");
-    }
-
     @Override
     protected boolean checkFindApp(String line) {
         return line.contains(taskAppId);

+ 1 - 0
escheduler-server/src/main/resources/worker_logback.xml

@@ -16,6 +16,7 @@
         <filter class="cn.escheduler.server.worker.log.TaskLogFilter"></filter>
         <Discriminator class="cn.escheduler.server.worker.log.TaskLogDiscriminator">
             <key>taskAppId</key>
+            <logBase>${log.base}</logBase>
         </Discriminator>
         <sift>
             <appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">