|
@@ -17,11 +17,13 @@
|
|
|
|
|
|
package org.apache.dolphinscheduler.server;
|
|
|
|
|
|
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
|
|
|
import org.apache.dolphinscheduler.api.enums.Status;
|
|
|
import org.apache.dolphinscheduler.api.service.ExecutorService;
|
|
|
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
|
|
|
import org.apache.dolphinscheduler.api.service.ProjectService;
|
|
|
import org.apache.dolphinscheduler.api.service.QueueService;
|
|
|
+import org.apache.dolphinscheduler.api.service.ResourcesService;
|
|
|
import org.apache.dolphinscheduler.api.service.SchedulerService;
|
|
|
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
|
|
|
import org.apache.dolphinscheduler.api.service.TenantService;
|
|
@@ -31,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
|
|
|
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
|
|
|
import org.apache.dolphinscheduler.common.enums.Priority;
|
|
|
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
|
|
|
+import org.apache.dolphinscheduler.common.enums.ProgramType;
|
|
|
import org.apache.dolphinscheduler.common.enums.ReleaseState;
|
|
|
import org.apache.dolphinscheduler.common.enums.RunMode;
|
|
|
import org.apache.dolphinscheduler.common.enums.TaskDependType;
|
|
@@ -50,11 +53,13 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
|
|
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
|
@@ -68,10 +73,12 @@ import org.springframework.context.annotation.ComponentScan;
|
|
|
|
|
|
import py4j.GatewayServer;
|
|
|
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
+
|
|
|
@SpringBootApplication
|
|
|
@ComponentScan(value = "org.apache.dolphinscheduler")
|
|
|
public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
- private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(PythonGatewayServer.class);
|
|
|
|
|
|
private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
|
|
|
private static final int DEFAULT_WARNING_GROUP_ID = 0;
|
|
@@ -107,6 +114,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
@Autowired
|
|
|
private QueueService queueService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private ResourcesService resourceService;
|
|
|
+
|
|
|
@Autowired
|
|
|
private ProjectMapper projectMapper;
|
|
|
|
|
@@ -244,7 +254,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
|
|
|
} else if (verifyStatus != Status.SUCCESS) {
|
|
|
String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
|
|
|
- LOGGER.error(msg);
|
|
|
+ logger.error(msg);
|
|
|
throw new RuntimeException(msg);
|
|
|
}
|
|
|
|
|
@@ -465,6 +475,30 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get resource by given program type and full name. It return map contain resource id, name.
|
|
|
+ * Useful in Python API create flink task which need processDefinition information.
|
|
|
+ *
|
|
|
+ * @param programType program type one of SCALA, JAVA and PYTHON
|
|
|
+ * @param fullName full name of the resource
|
|
|
+ */
|
|
|
+ public Map<String, Object> getResourcesFileInfo(String programType, String fullName) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+
|
|
|
+ Map<String, Object> resources = resourceService.queryResourceByProgramType(dummyAdminUser, ResourceType.FILE, ProgramType.valueOf(programType));
|
|
|
+ List<ResourceComponent> resourcesComponent = (List<ResourceComponent>) resources.get(Constants.DATA_LIST);
|
|
|
+ List<ResourceComponent> namedResources = resourcesComponent.stream().filter(s -> fullName.equals(s.getFullName())).collect(Collectors.toList());
|
|
|
+ if (CollectionUtils.isEmpty(namedResources)) {
|
|
|
+ String msg = String.format("Can not find valid resource by program type %s and name %s", programType, fullName);
|
|
|
+ logger.error(msg);
|
|
|
+ throw new IllegalArgumentException(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ result.put("id", namedResources.get(0).getId());
|
|
|
+ result.put("name", namedResources.get(0).getName());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void run() {
|
|
|
GatewayServer server = new GatewayServer(this);
|