|
@@ -48,6 +48,7 @@ import java.util.Collections;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.regex.Matcher;
|
|
@@ -55,7 +56,6 @@ import java.util.regex.Pattern;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* abstract command executor
|
|
|
*/
|
|
@@ -74,7 +74,7 @@ public abstract class AbstractCommandExecutor {
|
|
|
/**
|
|
|
* log handler
|
|
|
*/
|
|
|
- protected Consumer<List<String>> logHandler;
|
|
|
+ protected Consumer<LinkedBlockingQueue<String>> logHandler;
|
|
|
|
|
|
/**
|
|
|
* logger
|
|
@@ -82,9 +82,9 @@ public abstract class AbstractCommandExecutor {
|
|
|
protected Logger logger;
|
|
|
|
|
|
/**
|
|
|
- * log list
|
|
|
+ * log collection
|
|
|
*/
|
|
|
- protected final List<String> logBuffer;
|
|
|
+ protected final LinkedBlockingQueue<String> logBuffer;
|
|
|
|
|
|
protected boolean logOutputIsScuccess = false;
|
|
|
|
|
@@ -98,20 +98,16 @@ public abstract class AbstractCommandExecutor {
|
|
|
*/
|
|
|
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
|
|
|
|
|
|
- public AbstractCommandExecutor(Consumer<List<String>> logHandler,
|
|
|
+ public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
|
|
|
TaskExecutionContext taskExecutionContext,
|
|
|
Logger logger) {
|
|
|
this.logHandler = logHandler;
|
|
|
this.taskExecutionContext = taskExecutionContext;
|
|
|
this.logger = logger;
|
|
|
- this.logBuffer = Collections.synchronizedList(new ArrayList<>());
|
|
|
+ this.logBuffer = new LinkedBlockingQueue<>();
|
|
|
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
|
|
|
}
|
|
|
|
|
|
- protected AbstractCommandExecutor(List<String> logBuffer) {
|
|
|
- this.logBuffer = logBuffer;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* build process
|
|
|
*
|
|
@@ -232,7 +228,6 @@ public abstract class AbstractCommandExecutor {
|
|
|
return varPool.toString();
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* cancel application
|
|
|
*
|
|
@@ -329,15 +324,14 @@ public abstract class AbstractCommandExecutor {
|
|
|
*/
|
|
|
private void clear() {
|
|
|
|
|
|
- List<String> markerList = new ArrayList<>();
|
|
|
- markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
|
|
|
+ LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>();
|
|
|
+ markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
|
|
|
|
|
|
if (!logBuffer.isEmpty()) {
|
|
|
// log handle
|
|
|
logHandler.accept(logBuffer);
|
|
|
- logBuffer.clear();
|
|
|
}
|
|
|
- logHandler.accept(markerList);
|
|
|
+ logHandler.accept(markerLog);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -349,9 +343,7 @@ public abstract class AbstractCommandExecutor {
|
|
|
String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId());
|
|
|
ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService");
|
|
|
getOutputLogService.submit(() -> {
|
|
|
- BufferedReader inReader = null;
|
|
|
- try {
|
|
|
- inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
|
|
|
+ try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
|
|
String line;
|
|
|
logBuffer.add("welcome to use bigdata scheduling system...");
|
|
|
while ((line = inReader.readLine()) != null) {
|
|
@@ -366,7 +358,6 @@ public abstract class AbstractCommandExecutor {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
} finally {
|
|
|
logOutputIsScuccess = true;
|
|
|
- close(inReader);
|
|
|
}
|
|
|
});
|
|
|
getOutputLogService.shutdown();
|
|
@@ -460,31 +451,20 @@ public abstract class AbstractCommandExecutor {
|
|
|
* @return line list
|
|
|
*/
|
|
|
private List<String> convertFile2List(String filename) {
|
|
|
- List lineList = new ArrayList<String>(100);
|
|
|
+ List<String> lineList = new ArrayList<>(100);
|
|
|
File file = new File(filename);
|
|
|
|
|
|
if (!file.exists()) {
|
|
|
return lineList;
|
|
|
}
|
|
|
|
|
|
- BufferedReader br = null;
|
|
|
- try {
|
|
|
- br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8));
|
|
|
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))) {
|
|
|
String line = null;
|
|
|
while ((line = br.readLine()) != null) {
|
|
|
lineList.add(line);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
logger.error(String.format("read file: %s failed : ", filename), e);
|
|
|
- } finally {
|
|
|
- if (br != null) {
|
|
|
- try {
|
|
|
- br.close();
|
|
|
- } catch (IOException e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
return lineList;
|
|
|
}
|
|
@@ -556,27 +536,10 @@ public abstract class AbstractCommandExecutor {
|
|
|
lastFlushTime = now;
|
|
|
/** log handle */
|
|
|
logHandler.accept(logBuffer);
|
|
|
-
|
|
|
- logBuffer.clear();
|
|
|
}
|
|
|
return lastFlushTime;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * close buffer reader
|
|
|
- *
|
|
|
- * @param inReader in reader
|
|
|
- */
|
|
|
- private void close(BufferedReader inReader) {
|
|
|
- if (inReader != null) {
|
|
|
- try {
|
|
|
- inReader.close();
|
|
|
- } catch (IOException e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
protected List<String> commandOptions() {
|
|
|
return Collections.emptyList();
|
|
|
}
|