Browse Source

[python] Add task base database and procedure (#7279)

We add a new task procedure, and add parent class database
for both sql task and procedure task

fix: #6929
Jiajie Zhong 3 years ago
parent
commit
1948030151

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@@ -72,6 +72,7 @@ class TaskType(str):
     PYTHON = "PYTHON"
     SQL = "SQL"
     SUB_PROCESS = "SUB_PROCESS"
+    PROCEDURE = "PROCEDURE"
 
 
 class DefaultTaskCodeNum(str):

+ 83 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py

@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task database base task."""
+
+from typing import Dict
+
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class Database(Task):
+    """Base task to handle database, declare behavior for the base handler of database.
+
+    It a parent class for all database task of dolphinscheduler. And it should run sql like
+    job in multiply sql lik engine, such as:
+    - ClickHouse
+    - DB2
+    - HIVE
+    - MySQL
+    - Oracle
+    - Postgresql
+    - Presto
+    - SQLServer
+    You provider datasource_name contain connection information, it decisions which
+    database type and database instance would run this sql.
+    """
+
+    _task_custom_attr = {"sql"}
+
+    def __init__(
+        self, task_type: str, name: str, datasource_name: str, sql: str, *args, **kwargs
+    ):
+        super().__init__(name, task_type, *args, **kwargs)
+        self.datasource_name = datasource_name
+        self.sql = sql
+        self._datasource = {}
+
+    def get_datasource_type(self) -> str:
+        """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
+        return self.get_datasource_info(self.datasource_name).get("type")
+
+    def get_datasource_id(self) -> str:
+        """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
+        return self.get_datasource_info(self.datasource_name).get("id")
+
+    def get_datasource_info(self, name) -> Dict:
+        """Get datasource info from java gateway, contains datasource id, type, name."""
+        if self._datasource:
+            return self._datasource
+        else:
+            gateway = launch_gateway()
+            self._datasource = gateway.entry_point.getDatasourceInfo(name)
+            return self._datasource
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for sql task.
+
+        Sql task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        custom_params = {
+            "type": self.get_datasource_type(),
+            "datasource": self.get_datasource_id(),
+        }
+        params.update(custom_params)
+        return params

+ 43 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py

@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task procedure."""
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.database import Database
+
+
+class Procedure(Database):
+    """Task Procedure object, declare behavior for Procedure task to dolphinscheduler.
+
+    It should run database procedure job in multiply sql lik engine, such as:
+    - ClickHouse
+    - DB2
+    - HIVE
+    - MySQL
+    - Oracle
+    - Postgresql
+    - Presto
+    - SQLServer
+    You provider datasource_name contain connection information, it decisions which
+    database type and database instance would run this sql.
+    """
+
+    def __init__(self, name: str, datasource_name: str, sql: str, *args, **kwargs):
+        super().__init__(
+            TaskType.PROCEDURE, name, datasource_name, sql, *args, **kwargs
+        )

+ 4 - 40
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py

@@ -18,11 +18,10 @@
 """Task sql."""
 
 import re
-from typing import Dict, Optional
+from typing import Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.tasks.database import Database
 
 
 class SqlType:
@@ -32,7 +31,7 @@ class SqlType:
     NOT_SELECT = 1
 
 
-class Sql(Task):
+class Sql(Database):
     """Task SQL object, declare behavior for SQL task to dolphinscheduler.
 
     It should run sql job in multiply sql lik engine, such as:
@@ -67,30 +66,10 @@ class Sql(Task):
         *args,
         **kwargs
     ):
-        super().__init__(name, TaskType.SQL, *args, **kwargs)
-        self.datasource_name = datasource_name
-        self.sql = sql
+        super().__init__(TaskType.SQL, name, datasource_name, sql, *args, **kwargs)
         self.pre_statements = pre_statements or []
         self.post_statements = post_statements or []
         self.display_rows = display_rows
-        self._datasource = {}
-
-    def get_datasource_type(self) -> str:
-        """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self.datasource_name).get("type")
-
-    def get_datasource_id(self) -> str:
-        """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self.datasource_name).get("id")
-
-    def get_datasource_info(self, name) -> Dict:
-        """Get datasource info from java gateway, contains datasource id, type, name."""
-        if self._datasource:
-            return self._datasource
-        else:
-            gateway = launch_gateway()
-            self._datasource = gateway.entry_point.getDatasourceInfo(name)
-            return self._datasource
 
     @property
     def sql_type(self) -> int:
@@ -103,18 +82,3 @@ class Sql(Task):
             return SqlType.NOT_SELECT
         else:
             return SqlType.SELECT
-
-    @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
-        """Override Task.task_params for sql task.
-
-        Sql task have some specials attribute for task_params, and is odd if we
-        directly set as python property, so we Override Task.task_params here.
-        """
-        params = super().task_params
-        custom_params = {
-            "type": self.get_datasource_type(),
-            "datasource": self.get_datasource_id(),
-        }
-        params.update(custom_params)
-        return params

+ 127 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py

@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Database."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.database import Database
+
+TEST_DATABASE_TASK_TYPE = "SQL"
+TEST_DATABASE_SQL = "select 1"
+TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+    return_value=({"id": 1, "type": "mock_type"}),
+)
+def test_get_datasource_detail(mock_datasource, mock_code_version):
+    """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
+    name = "test_get_database_detail"
+    task = Database(
+        TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
+    )
+    assert 1 == task.get_datasource_id()
+    assert "mock_type" == task.get_datasource_type()
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {
+                "task_type": TEST_DATABASE_TASK_TYPE,
+                "name": "test-task-params",
+                "datasource_name": TEST_DATABASE_DATASOURCE_NAME,
+                "sql": TEST_DATABASE_SQL,
+            },
+            {
+                "type": "MYSQL",
+                "datasource": 1,
+                "sql": TEST_DATABASE_SQL,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
+    """Test task database task property."""
+    task = Database(**attr)
+    assert expect == task.task_params
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_database_get_define(mock_datasource, mock_code_version):
+    """Test task database function get_define."""
+    name = "test_database_get_define"
+    expect = {
+        "code": 123,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TEST_DATABASE_TASK_TYPE,
+        "taskParams": {
+            "type": "MYSQL",
+            "datasource": 1,
+            "sql": TEST_DATABASE_SQL,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    task = Database(
+        TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
+    )
+    assert task.get_define() == expect

+ 122 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py

@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Procedure."""
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.procedure import Procedure
+
+TEST_PROCEDURE_SQL = (
+    'create procedure HelloWorld() selece "hello world"; call HelloWorld();'
+)
+TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
+    return_value=({"id": 1, "type": "mock_type"}),
+)
+def test_get_datasource_detail(mock_datasource, mock_code_version):
+    """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
+    name = "test_get_datasource_detail"
+    task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
+    assert 1 == task.get_datasource_id()
+    assert "mock_type" == task.get_datasource_type()
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {
+                "name": "test-procedure-task-params",
+                "datasource_name": TEST_PROCEDURE_DATASOURCE_NAME,
+                "sql": TEST_PROCEDURE_SQL,
+            },
+            {
+                "sql": TEST_PROCEDURE_SQL,
+                "type": "MYSQL",
+                "datasource": 1,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
+    """Test task sql task property."""
+    task = Procedure(**attr)
+    assert expect == task.task_params
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_sql_get_define(mock_datasource, mock_code_version):
+    """Test task procedure function get_define."""
+    name = "test_procedure_get_define"
+    expect = {
+        "code": 123,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "PROCEDURE",
+        "taskParams": {
+            "type": "MYSQL",
+            "datasource": 1,
+            "sql": TEST_PROCEDURE_SQL,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
+    assert task.get_define() == expect

+ 1 - 1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py

@@ -126,7 +126,7 @@ def test_sql_get_define(mock_datasource):
     """Test task sql function get_define."""
     code = 123
     version = 1
-    name = "test_sql_dict"
+    name = "test_sql_get_define"
     command = "select 1"
     datasource_name = "test_datasource"
     expect = {