|
@@ -20,10 +20,16 @@ package org.apache.dolphinscheduler.api.service.impl;
|
|
|
import org.apache.dolphinscheduler.api.enums.Status;
|
|
|
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
|
|
|
import org.apache.dolphinscheduler.api.service.LoggerService;
|
|
|
+import org.apache.dolphinscheduler.api.service.ProjectService;
|
|
|
import org.apache.dolphinscheduler.api.utils.Result;
|
|
|
import org.apache.dolphinscheduler.common.Constants;
|
|
|
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
|
|
+import org.apache.dolphinscheduler.dao.entity.Project;
|
|
|
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
|
|
+import org.apache.dolphinscheduler.dao.entity.User;
|
|
|
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
|
|
|
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
|
|
import org.apache.dolphinscheduler.remote.utils.Host;
|
|
|
import org.apache.dolphinscheduler.service.log.LogClientService;
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
@@ -31,6 +37,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
@@ -47,7 +54,7 @@ import com.google.common.primitives.Bytes;
|
|
|
* logger service impl
|
|
|
*/
|
|
|
@Service
|
|
|
-public class LoggerServiceImpl implements LoggerService {
|
|
|
+public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService {
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class);
|
|
|
|
|
@@ -58,6 +65,15 @@ public class LoggerServiceImpl implements LoggerService {
|
|
|
|
|
|
private LogClientService logClient;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ ProjectMapper projectMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ ProjectService projectService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ TaskDefinitionMapper taskDefinitionMapper;
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
if (Objects.isNull(this.logClient)) {
|
|
@@ -89,10 +105,117 @@ public class LoggerServiceImpl implements LoggerService {
|
|
|
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
|
|
|
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
|
|
|
}
|
|
|
+ Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
|
|
|
+ String log = queryLog(taskInstance,skipLineNum,limit);
|
|
|
+ result.setData(log);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
|
|
|
- String host = getHost(taskInstance.getHost());
|
|
|
|
|
|
- Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
|
|
|
+ /**
|
|
|
+ * get log size
|
|
|
+ *
|
|
|
+ * @param taskInstId task instance id
|
|
|
+ * @return log byte array
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public byte[] getLogBytes(int taskInstId) {
|
|
|
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
|
|
|
+ if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
|
|
|
+ throw new ServiceException("task instance is null or host is null");
|
|
|
+ }
|
|
|
+ return getLogBytes(taskInstance);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * query log
|
|
|
+ *
|
|
|
+ * @param loginUser login user
|
|
|
+ * @param projectCode project code
|
|
|
+ * @param taskInstId task instance id
|
|
|
+ * @param skipLineNum skip line number
|
|
|
+ * @param limit limit
|
|
|
+ * @return log string data
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) {
|
|
|
+ Project project = projectMapper.queryByCode(projectCode);
|
|
|
+ //check user access for project
|
|
|
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
|
|
|
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ // check whether the task instance can be found
|
|
|
+ TaskInstance task = processService.findTaskInstanceById(taskInstId);
|
|
|
+ if (task == null || StringUtils.isBlank(task.getHost())) {
|
|
|
+ putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
|
|
|
+ if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
|
|
|
+ putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ String log = queryLog(task, skipLineNum, limit);
|
|
|
+ result.put(Constants.DATA_LIST, log);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * get log bytes
|
|
|
+ *
|
|
|
+ * @param loginUser login user
|
|
|
+ * @param projectCode project code
|
|
|
+ * @param taskInstId task instance id
|
|
|
+ * @return log byte array
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) {
|
|
|
+ Project project = projectMapper.queryByCode(projectCode);
|
|
|
+ //check user access for project
|
|
|
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
|
|
|
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
|
|
|
+ throw new ServiceException("user has no permission");
|
|
|
+ }
|
|
|
+ // check whether the task instance can be found
|
|
|
+ TaskInstance task = processService.findTaskInstanceById(taskInstId);
|
|
|
+ if (task == null || StringUtils.isBlank(task.getHost())) {
|
|
|
+ throw new ServiceException("task instance is null or host is null");
|
|
|
+ }
|
|
|
+
|
|
|
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
|
|
|
+ if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
|
|
|
+ throw new ServiceException("task instance does not exist in project");
|
|
|
+ }
|
|
|
+ return getLogBytes(task);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * get host
|
|
|
+ *
|
|
|
+ * @param address address
|
|
|
+ * @return old version return true ,otherwise return false
|
|
|
+ */
|
|
|
+ private String getHost(String address) {
|
|
|
+ if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
|
|
|
+ return address;
|
|
|
+ }
|
|
|
+ return Host.of(address).getIp();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * query log
|
|
|
+ *
|
|
|
+ * @param taskInstance task instance
|
|
|
+ * @param skipLineNum skip line number
|
|
|
+ * @param limit limit
|
|
|
+ * @return log string data
|
|
|
+ */
|
|
|
+ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
|
|
|
+
|
|
|
+ String host = getHost(taskInstance.getHost());
|
|
|
|
|
|
logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
|
|
|
PropertyUtils.getInt(Constants.RPC_PORT, 50051));
|
|
@@ -109,23 +232,16 @@ public class LoggerServiceImpl implements LoggerService {
|
|
|
log.append(logClient
|
|
|
.rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit));
|
|
|
|
|
|
- result.setData(log.toString());
|
|
|
- return result;
|
|
|
+ return log.toString();
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
- * get log size
|
|
|
+ * get log bytes
|
|
|
*
|
|
|
- * @param taskInstId task instance id
|
|
|
+ * @param taskInstance task instance
|
|
|
* @return log byte array
|
|
|
*/
|
|
|
- @Override
|
|
|
- public byte[] getLogBytes(int taskInstId) {
|
|
|
- TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
|
|
|
- if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
|
|
|
- throw new ServiceException("task instance is null or host is null");
|
|
|
- }
|
|
|
+ private byte[] getLogBytes(TaskInstance taskInstance) {
|
|
|
String host = getHost(taskInstance.getHost());
|
|
|
byte[] head = String.format(LOG_HEAD_FORMAT,
|
|
|
taskInstance.getLogPath(),
|
|
@@ -134,17 +250,4 @@ public class LoggerServiceImpl implements LoggerService {
|
|
|
return Bytes.concat(head,
|
|
|
logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath()));
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * get host
|
|
|
- *
|
|
|
- * @param address address
|
|
|
- * @return old version return true ,otherwise return false
|
|
|
- */
|
|
|
- private String getHost(String address) {
|
|
|
- if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
|
|
|
- return address;
|
|
|
- }
|
|
|
- return Host.of(address).getIp();
|
|
|
- }
|
|
|
}
|