瀏覽代碼

[Feat][python] Add parameter environment to task (#11763)

JieguangZhou 2 年之前
父節點
當前提交
8988492c43
共有 19 個文件被更改,包括 58 次插入0 次删除
  1. 24 0
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 11 0
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  3. 4 0
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
  4. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
  5. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
  6. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
  7. 2 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
  8. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
  9. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
  10. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
  11. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
  12. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
  13. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
  14. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
  15. 3 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
  16. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
  17. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
  18. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
  19. 1 0
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

+ 24 - 0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@@ -30,8 +30,10 @@ import javax.annotation.PostConstruct;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration;
+import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
 import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.EnvironmentService;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -98,6 +100,9 @@ public class PythonGateway {
     @Autowired
     private TenantService tenantService;
 
+    @Autowired
+    private EnvironmentService environmentService;
+
     @Autowired
     private ExecutorService executorService;
 
@@ -562,6 +567,25 @@ public class PythonGateway {
         return result;
     }
 
+    /**
+     * Get environment info by given environment name. It return environment code.
+     * Useful in Python API create task which need environment information.
+     *
+     * @param environmentName name of the environment
+     */
+    public Long getEnvironmentInfo(String environmentName) {
+        Map<String, Object> result = environmentService.queryEnvironmentByName(environmentName);
+
+        if (result.get("data") == null) {
+            String msg = String.format("Can not find valid environment by name %s", environmentName);
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+        EnvironmentDto environmentDto = EnvironmentDto.class.cast(result.get("data"));
+        return environmentDto.getCode();
+    }
+
+
     /**
      * Get resource by given resource type and full name. It return map contain resource id, name.
      * Useful in Python API create task which need processDefinition information.

+ 11 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@@ -90,6 +90,7 @@ class Task(Base):
         "flag",
         "task_priority",
         "worker_group",
+        "environment_code",
         "delay_time",
         "fail_retry_times",
         "fail_retry_interval",
@@ -110,6 +111,7 @@ class Task(Base):
         flag: Optional[str] = TaskFlag.YES,
         task_priority: Optional[str] = TaskPriority.MEDIUM,
         worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+        environment_name: Optional[str] = None,
         delay_time: Optional[int] = 0,
         fail_retry_times: Optional[int] = 0,
         fail_retry_interval: Optional[int] = 1,
@@ -129,6 +131,7 @@ class Task(Base):
         self.flag = flag
         self.task_priority = task_priority
         self.worker_group = worker_group
+        self._environment_name = environment_name
         self.fail_retry_times = fail_retry_times
         self.fail_retry_interval = fail_retry_interval
         self.delay_time = delay_time
@@ -145,6 +148,7 @@ class Task(Base):
         # move attribute code and version after _process_definition and process_definition declare
         self.code, self.version = self.gen_code_and_version()
         # Add task to process definition, maybe we could put into property process_definition latter
+
         if (
             self.process_definition is not None
             and self.code not in self.process_definition.tasks
@@ -306,3 +310,10 @@ class Task(Base):
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)
         return result.get("code"), result.get("version")
+
+    @property
+    def environment_code(self) -> str:
+        """Convert environment name to code."""
+        if self._environment_name is None:
+            return None
+        return JavaGate().query_environment_info(self._environment_name)

+ 4 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py

@@ -98,6 +98,10 @@ class JavaGate:
         """Get resources file info through java gateway."""
         return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
 
+    def query_environment_info(self, name: str):
+        """Get environment info through java gateway."""
+        return self.java_gateway.entry_point.getEnvironmentInfo(name)
+
     def get_code_and_version(
         self, project_name: str, process_definition_name: str, task_name: str
     ):

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py

@@ -124,6 +124,7 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect):
                 "flag": "YES",
                 "taskPriority": "MEDIUM",
                 "workerGroup": "default",
+                "environmentCode": None,
                 "failRetryTimes": 0,
                 "failRetryInterval": 1,
                 "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@@ -161,6 +161,7 @@ def test_task_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

@@ -381,6 +381,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 2 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py

@@ -67,6 +67,7 @@ def test_datax_get_define(mock_datasource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -108,6 +109,7 @@ def test_custom_datax_get_define(json_template):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py

@@ -782,6 +782,7 @@ def test_dependent_get_define(mock_code_version, mock_dep_code):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py

@@ -68,6 +68,7 @@ def test_flink_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py

@@ -130,6 +130,7 @@ def test_http_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py

@@ -61,6 +61,7 @@ def test_mr_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py

@@ -96,6 +96,7 @@ def test_sql_get_define(mock_datasource, mock_code_version):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py

@@ -132,6 +132,7 @@ def test_python_get_define(name, script_code, raw):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py

@@ -87,6 +87,7 @@ def test_sagemaker_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 3 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py

@@ -62,6 +62,7 @@ def test_shell_get_define():
         "name": name,
         "version": 1,
         "description": None,
+        "environmentCode": None,
         "delayTime": 0,
         "taskType": "SHELL",
         "taskParams": {
@@ -75,6 +76,7 @@ def test_shell_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -86,4 +88,5 @@ def test_shell_get_define():
         return_value=(code, version),
     ):
         shell = Shell(name, command)
+        print(shell.get_define())
         assert shell.get_define() == expect

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py

@@ -68,6 +68,7 @@ def test_spark_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py

@@ -152,6 +152,7 @@ def test_sql_get_define(mock_datasource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py

@@ -99,6 +99,7 @@ def test_sub_process_get_define(mock_process_definition):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

@@ -250,6 +250,7 @@ def test_switch_get_define(mock_task_code_version):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",