ソースを参照

[Python] Combine gateway.entry_point call in python api side (#11330)

陈家名 2 年 前
コミット
3061bbc5c4

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py

@@ -22,7 +22,7 @@ from typing import Dict
 from py4j.protocol import Py4JJavaError
 
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class Database(dict):
@@ -54,9 +54,8 @@ class Database(dict):
         if self._database:
             return self._database
         else:
-            gateway = launch_gateway()
             try:
-                self._database = gateway.entry_point.getDatasourceInfo(name)
+                self._database = JavaGate().get_datasource_info(name)
             # Handler database source do not exists error, for now we just terminate the process.
             except Py4JJavaError as ex:
                 raise PyDSParamException(str(ex.java_exception))

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py

@@ -23,7 +23,7 @@ from py4j.protocol import Py4JJavaError
 
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class ProgramType(str):
@@ -62,9 +62,8 @@ class Engine(Task):
         if self._resource:
             return self._resource
         else:
-            gateway = launch_gateway()
             try:
-                self._resource = gateway.entry_point.getResourcesFileInfo(
+                self._resource = JavaGate().get_resources_file_info(
                     program_type, main_package
                 )
             # Handler source do not exists error, for now we just terminate the process.

+ 4 - 6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@@ -25,7 +25,7 @@ from pydolphinscheduler import configuration
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base, Project, Tenant, User
 from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule
 
@@ -392,14 +392,12 @@ class ProcessDefinition(Base):
         self._ensure_side_model_exists()
         self._pre_submit_check()
 
-        gateway = launch_gateway()
-        self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition(
+        self._process_definition_code = JavaGate().create_or_update_process_definition(
             self._user,
             self._project,
             self.name,
             str(self.description) if self.description else "",
             json.dumps(self.param_json),
-            json.dumps(self.schedule_json) if self.schedule_json else None,
             self.warning_type,
             self.warning_group_id,
             json.dumps(self.task_location),
@@ -410,6 +408,7 @@ class ProcessDefinition(Base):
             # TODO add serialization function
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
+            json.dumps(self.schedule_json) if self.schedule_json else None,
             None,
             None,
         )
@@ -424,8 +423,7 @@ class ProcessDefinition(Base):
 
         which post to `start-process-instance` to java gateway
         """
-        gateway = launch_gateway()
-        gateway.entry_point.execProcessInstance(
+        JavaGate().exec_process_instance(
             self._user,
             self._project,
             self.name,

+ 4 - 6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py

@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base
 
 
@@ -53,8 +53,7 @@ class Resource(Base):
             raise PyDSParamException(
                 "`user_name` is required when querying resources from python gate."
             )
-        gateway = launch_gateway()
-        return gateway.entry_point.queryResourcesFileInfo(self.user_name, self.name)
+        return JavaGate().query_resources_file_info(self.user_name, self.name)
 
     def get_id_from_database(self):
         """Get resource id from java gateway."""
@@ -66,10 +65,9 @@ class Resource(Base):
             raise PyDSParamException(
                 "`user_name` and `content` are required when create or update resource from python gate."
             )
-        gateway = launch_gateway()
-        gateway.entry_point.createOrUpdateResource(
+        JavaGate().create_or_update_resource(
             self.user_name,
             self.name,
-            self.description,
             self.content,
+            self.description,
         )

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@@ -34,7 +34,7 @@ from pydolphinscheduler.core.process_definition import (
 )
 from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base
 
 logger = getLogger(__name__)
@@ -300,8 +300,7 @@ class Task(Base):
         equal to 0 by java gateway, otherwise if will return the exists code and version.
         """
         # TODO get code from specific project process definition and task name
-        gateway = launch_gateway()
-        result = gateway.entry_point.getCodeAndVersion(
+        result = JavaGate().get_code_and_version(
             self.process_definition._project, self.process_definition.name, self.name
         )
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)

+ 156 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py

@@ -63,3 +63,159 @@ def gateway_result_checker(
     ):
         raise PyDSJavaGatewayException("Get result state not success.")
     return result
+
+
+class JavaGate:
+    """Launch java gateway to pydolphin scheduler."""
+
+    def __init__(
+        self,
+        address: Optional[str] = None,
+        port: Optional[int] = None,
+        auto_convert: Optional[bool] = True,
+    ):
+        self.java_gateway = launch_gateway(address, port, auto_convert)
+
+    def get_datasource_info(self, name: str):
+        """Get datasource info through java gateway."""
+        return self.java_gateway.entry_point.getDatasourceInfo(name)
+
+    def get_resources_file_info(self, program_type: str, main_package: str):
+        """Get resources file info through java gateway."""
+        return self.java_gateway.entry_point.getResourcesFileInfo(
+            program_type, main_package
+        )
+
+    def create_or_update_resource(
+        self, user_name: str, name: str, content: str, description: Optional[str] = None
+    ):
+        """Create or update resource through java gateway."""
+        return self.java_gateway.entry_point.createOrUpdateResource(
+            user_name, name, description, content
+        )
+
+    def query_resources_file_info(self, user_name: str, name: str):
+        """Get resources file info through java gateway."""
+        return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
+
+    def get_code_and_version(
+        self, project_name: str, process_definition_name: str, task_name: str
+    ):
+        """Get code and version through java gateway."""
+        return self.java_gateway.entry_point.getCodeAndVersion(
+            project_name, process_definition_name, task_name
+        )
+
+    def create_or_grant_project(
+        self, user: str, name: str, description: Optional[str] = None
+    ):
+        """Create or grant project through java gateway."""
+        return self.java_gateway.entry_point.createOrGrantProject(
+            user, name, description
+        )
+
+    def create_tenant(
+        self, tenant_name: str, queue_name: str, description: Optional[str] = None
+    ):
+        """Create tenant through java gateway."""
+        return self.java_gateway.entry_point.createTenant(
+            tenant_name, description, queue_name
+        )
+
+    def create_user(
+        self,
+        name: str,
+        password: str,
+        email: str,
+        phone: str,
+        tenant: str,
+        queue: str,
+        status: int,
+    ):
+        """Create user through java gateway."""
+        return self.java_gateway.entry_point.createUser(
+            name, password, email, phone, tenant, queue, status
+        )
+
+    def get_dependent_info(
+        self,
+        project_name: str,
+        process_definition_name: str,
+        task_name: Optional[str] = None,
+    ):
+        """Get dependent info through java gateway."""
+        return self.java_gateway.entry_point.getDependentInfo(
+            project_name, process_definition_name, task_name
+        )
+
+    def get_process_definition_info(
+        self, user_name: str, project_name: str, process_definition_name: str
+    ):
+        """Get process definition info through java gateway."""
+        return self.java_gateway.entry_point.getProcessDefinitionInfo(
+            user_name, project_name, process_definition_name
+        )
+
+    def create_or_update_process_definition(
+        self,
+        user_name: str,
+        project_name: str,
+        name: str,
+        description: str,
+        global_params: str,
+        warning_type: str,
+        warning_group_id: int,
+        locations: str,
+        timeout: int,
+        worker_group: str,
+        tenant_code: str,
+        release_state: int,
+        task_relation_json: str,
+        task_definition_json: str,
+        schedule: Optional[str] = None,
+        other_params_json: Optional[str] = None,
+        execution_type: Optional[str] = None,
+    ):
+        """Create or update process definition through java gateway."""
+        return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
+            user_name,
+            project_name,
+            name,
+            description,
+            global_params,
+            schedule,
+            warning_type,
+            warning_group_id,
+            locations,
+            timeout,
+            worker_group,
+            tenant_code,
+            release_state,
+            task_relation_json,
+            task_definition_json,
+            other_params_json,
+            execution_type,
+        )
+
+    def exec_process_instance(
+        self,
+        user_name: str,
+        project_name: str,
+        process_definition_name: str,
+        cron_time: str,
+        worker_group: str,
+        warning_type: str,
+        warning_group_id: int,
+        timeout: int,
+    ):
+        """Exec process instance through java gateway."""
+        return self.java_gateway.entry_point.execProcessInstance(
+            user_name,
+            project_name,
+            process_definition_name,
+            cron_time,
+            worker_group,
+            warning_type,
+            warning_group_id,
+            timeout,
+        )

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py

@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide
 
 
@@ -36,7 +36,6 @@ class Project(BaseSide):
 
     def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
         """Create Project if not exists."""
-        gateway = launch_gateway()
-        gateway.entry_point.createOrGrantProject(user, self.name, self.description)
+        JavaGate().create_or_grant_project(user, self.name, self.description)
         # TODO recover result checker
         # gateway_result_checker(result, None)

+ 0 - 8
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py

@@ -20,7 +20,6 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import gateway_result_checker, launch_gateway
 from pydolphinscheduler.models import BaseSide
 
 
@@ -33,10 +32,3 @@ class Queue(BaseSide):
         description: Optional[str] = "",
     ):
         super().__init__(name, description)
-
-    def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
-        """Create Queue if not exists."""
-        gateway = launch_gateway()
-        # Here we set Queue.name and Queue.queueName same as self.name
-        result = gateway.entry_point.createProject(user, self.name, self.name)
-        gateway_result_checker(result, None)

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py

@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide
 
 
@@ -40,6 +40,5 @@ class Tenant(BaseSide):
         self, queue_name: str, user=configuration.USER_NAME
     ) -> None:
         """Create Tenant if not exists."""
-        gateway = launch_gateway()
-        gateway.entry_point.createTenant(self.name, self.description, queue_name)
+        JavaGate().create_tenant(self.name, queue_name, self.description)
         # gateway_result_checker(result, None)

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py

@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide, Tenant
 
 
@@ -64,8 +64,7 @@ class User(BaseSide):
         """Create User if not exists."""
         # Should make sure queue already exists.
         self.create_tenant_if_not_exists()
-        gateway = launch_gateway()
-        gateway.entry_point.createUser(
+        JavaGate().create_user(
             self.name,
             self.password,
             self.email,

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py

@@ -22,7 +22,7 @@ from typing import Dict, Optional, Tuple
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models.base import Base
 
 DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
@@ -165,9 +165,8 @@ class DependentItem(Base):
         if self._code:
             return self._code
         else:
-            gateway = launch_gateway()
             try:
-                self._code = gateway.entry_point.getDependentInfo(*self.code_parameter)
+                self._code = JavaGate().get_dependent_info(*self.code_parameter)
                 return self._code
             except Exception:
                 raise PyDSJavaGatewayException("Function get_code_from_gateway error.")

+ 2 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py

@@ -22,7 +22,7 @@ from typing import Dict
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class SubProcess(Task):
@@ -47,8 +47,7 @@ class SubProcess(Task):
             raise PyDSProcessDefinitionNotAssignException(
                 "ProcessDefinition must be provider for task SubProcess."
             )
-        gateway = launch_gateway()
-        return gateway.entry_point.getProcessDefinitionInfo(
+        return JavaGate().get_process_definition_info(
             self.process_definition.user.name,
             self.process_definition.project.name,
             process_definition_name,